This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 53ef39c8046 [HUDI-8508] Fix timeline service port assignment bug
(#12241)
53ef39c8046 is described below
commit 53ef39c8046559e9065679d2d943c95d091e6f0e
Author: Tim Brown <[email protected]>
AuthorDate: Thu Nov 21 22:57:04 2024 -0600
[HUDI-8508] Fix timeline service port assignment bug (#12241)
* fix timeline service bug
* add test with basic server instead of another timeline service
---
.../hudi/timeline/service/TimelineService.java | 24 +++--
.../hudi/timeline/service/TestTimelineService.java | 108 +++++++++++++++++++++
2 files changed, 124 insertions(+), 8 deletions(-)
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
index 350afad591f..481fa02d1e6 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
@@ -33,6 +33,7 @@ import org.apache.hudi.storage.StorageConfiguration;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import io.javalin.Javalin;
+import io.javalin.core.util.JavalinBindException;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
@@ -344,18 +345,18 @@ public class TimelineService {
// Returns port to try when trying to bind a service. Handles wrapping
and skipping privileged ports.
int tryPort = port == 0 ? port : (port + attempt - 1024) % (65536 -
1024) + 1024;
try {
+ createApp();
app.start(tryPort);
return app.port();
} catch (Exception e) {
- if (e.getMessage() != null && e.getMessage().contains("Failed to bind
to")) {
+ if (e instanceof JavalinBindException) {
if (tryPort == 0) {
LOG.warn("Timeline server could not bind on a random free port.");
} else {
- LOG.warn(String.format("Timeline server could not bind on port %d.
"
- + "Attempting port %d + 1.",tryPort, tryPort));
+ LOG.warn("Timeline server could not bind on port {}. Attempting
port {} + 1.", tryPort, tryPort);
}
} else {
- LOG.warn(String.format("Timeline server start failed on port %d.
Attempting port %d + 1.",tryPort, tryPort), e);
+ LOG.warn("Timeline server start failed on port {}. Attempting port
{} + 1.", tryPort, tryPort, e);
}
}
}
@@ -363,6 +364,17 @@ public class TimelineService {
}
public int startService() throws IOException {
+ int realServerPort = startServiceOnPort(serverPort);
+ LOG.info("Starting Timeline server on port: {}", realServerPort);
+ this.serverPort = realServerPort;
+ return realServerPort;
+ }
+
+ private void createApp() throws IOException {
+ // if app needs to be recreated, stop the existing one
+ if (app != null) {
+ app.stop();
+ }
int maxThreads = timelineServerConf.numThreads > 0 ?
timelineServerConf.numThreads : DEFAULT_NUM_THREADS;
QueuedThreadPool pool = new QueuedThreadPool(maxThreads, 8, 60_000);
pool.setDaemon(true);
@@ -381,10 +393,6 @@ public class TimelineService {
app, storageConf, timelineServerConf, context, storage,
fsViewsManager);
app.get("/", ctx -> ctx.result("Hello Hudi"));
requestHandler.register();
- int realServerPort = startServiceOnPort(serverPort);
- LOG.info("Starting Timeline server on port :" + realServerPort);
- this.serverPort = realServerPort;
- return realServerPort;
}
public void run() throws IOException {
diff --git
a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestTimelineService.java
b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestTimelineService.java
new file mode 100644
index 00000000000..589610e4895
--- /dev/null
+++
b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestTimelineService.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.mockito.Mockito.mock;
+
+class TestTimelineService {
+
+ @Test
+ void createServerUsesRandomPortIfAnotherTimelineServiceRunningOnSamePort()
throws Exception {
+ TimelineService timelineService = null;
+ TimelineService secondTimelineService = null;
+ try {
+ StorageConfiguration<Configuration> conf =
HoodieTestUtils.getDefaultStorageConf();
+ HoodieEngineContext engineContext = new
HoodieLocalEngineContext(HoodieTestUtils.getDefaultStorageConf());
+ int originalServerPort = 8888;
+ TimelineService.Config config =
TimelineService.Config.builder().enableMarkerRequests(true).serverPort(originalServerPort).build();
+ HoodieStorage storage = HoodieTestUtils.getDefaultStorage();
+ FileSystemViewManager viewManager = mock(FileSystemViewManager.class);
+ timelineService = new TimelineService(engineContext, conf, config,
storage, viewManager);
+ assertEquals(originalServerPort, timelineService.startService());
+ // Create second service with the same configs
+ secondTimelineService = new TimelineService(engineContext, conf, config,
storage, viewManager);
+ assertNotEquals(originalServerPort,
secondTimelineService.startService());
+ } finally {
+ if (timelineService != null) {
+ timelineService.close();
+ }
+ if (secondTimelineService != null) {
+ secondTimelineService.close();
+ }
+ }
+ }
+
+ @Test
+ void createServerUsesRandomPortIfProvidedPortInUse() throws Exception {
+ TimelineService timelineService = null;
+ HttpServer server = null;
+ try {
+ int originalServerPort = 8888;
+ server = HttpServer.create(new InetSocketAddress(originalServerPort), 0);
+ server.createContext("/", new MyHandler());
+ server.setExecutor(null);
+ server.start();
+
+ TimelineService.Config config =
TimelineService.Config.builder().enableMarkerRequests(true).serverPort(originalServerPort).build();
+ HoodieStorage storage = HoodieTestUtils.getDefaultStorage();
+ FileSystemViewManager viewManager = mock(FileSystemViewManager.class);
+ StorageConfiguration<Configuration> conf =
HoodieTestUtils.getDefaultStorageConf();
+ HoodieEngineContext engineContext = new
HoodieLocalEngineContext(HoodieTestUtils.getDefaultStorageConf());
+ timelineService = new TimelineService(engineContext, conf, config,
storage, viewManager);
+ assertNotEquals(originalServerPort, timelineService.startService());
+ } finally {
+ if (timelineService != null) {
+ timelineService.close();
+ }
+ if (server != null) {
+ server.stop(0);
+ }
+ }
+ }
+
+ static class MyHandler implements HttpHandler {
+ @Override
+ public void handle(HttpExchange t) throws IOException {
+ String response = "Hello World!";
+ t.sendResponseHeaders(200, response.length());
+ try (OutputStream os = t.getResponseBody()) {
+ os.write(response.getBytes());
+ }
+ }
+ }
+}