nsivabalan commented on code in PR #10122:
URL: https://github.com/apache/hudi/pull/10122#discussion_r1399754308
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java:
##########
@@ -29,37 +31,95 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.timeline.service.TimelineService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Timeline Service that runs as part of write client.
*/
public class EmbeddedTimelineService {
+ // lock used when starting/stopping/modifying embedded services
+ private static final Object SERVICE_LOCK = new Object();
private static final Logger LOG =
LoggerFactory.getLogger(EmbeddedTimelineService.class);
-
+ private static final AtomicInteger NUM_SERVERS_RUNNING = new
AtomicInteger(0);
+ // Map of port to existing timeline service running on that port
+ private static final Map<Integer, EmbeddedTimelineService> RUNNING_SERVICES
= new HashMap<>();
+ private static final Registry METRICS_REGISTRY =
Registry.getRegistry("TimelineService");
+ private static final String NUM_EMBEDDED_TIMELINE_SERVERS =
"numEmbeddedTimelineServers";
private int serverPort;
private String hostAddr;
- private HoodieEngineContext context;
+ private final HoodieEngineContext context;
private final SerializableConfiguration hadoopConf;
private final HoodieWriteConfig writeConfig;
- private final String basePath;
+ private TimelineService.Config serviceConfig;
+ private final Set<String> basePaths; // the set of base paths using this
EmbeddedTimelineService
private transient FileSystemViewManager viewManager;
private transient TimelineService server;
- public EmbeddedTimelineService(HoodieEngineContext context, String
embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig) {
+ private EmbeddedTimelineService(HoodieEngineContext context, String
embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig) {
setHostAddr(embeddedTimelineServiceHostAddr);
this.context = context;
this.writeConfig = writeConfig;
- this.basePath = writeConfig.getBasePath();
+ this.basePaths = new HashSet<>();
+ this.basePaths.add(writeConfig.getBasePath());
this.hadoopConf = context.getHadoopConf();
this.viewManager = createViewManager();
}
+ /**
+ * Returns an existing embedded timeline service if one is running for the
given configuration and reuse is enabled, or starts a new one.
+ * @param context The {@link HoodieEngineContext} for the client
+ * @param embeddedTimelineServiceHostAddr The host address to use for the
service (nullable)
+ * @param writeConfig The {@link HoodieWriteConfig} for the client
+ * @return A running {@link EmbeddedTimelineService}
+ * @throws IOException if an error occurs while starting the service
+ */
+ public static EmbeddedTimelineService
getOrStartEmbeddedTimelineService(HoodieEngineContext context, String
embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig) throws
IOException {
+ return getOrStartEmbeddedTimelineService(context,
embeddedTimelineServiceHostAddr, writeConfig, TimelineService::new);
+ }
+
+ static EmbeddedTimelineService
getOrStartEmbeddedTimelineService(HoodieEngineContext context, String
embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig,
+
TimelineServiceCreator timelineServiceCreator) throws IOException {
+ // if reuse is enabled, check if any existing instances are compatible
+ if (writeConfig.isEmbeddedTimelineServerReuseEnabled()) {
+ synchronized (SERVICE_LOCK) {
+ for (EmbeddedTimelineService service : RUNNING_SERVICES.values()) {
+ if (service.canReuseFor(writeConfig,
embeddedTimelineServiceHostAddr)) {
+ service.addBasePath(writeConfig.getBasePath());
+ LOG.info("Reusing existing embedded timeline server with
configuration: " + service.serviceConfig);
+ return service;
+ }
+ }
+ // if no compatible instance is found, create a new one
+ EmbeddedTimelineService service = createAndStartService(context,
embeddedTimelineServiceHostAddr, writeConfig, timelineServiceCreator);
+ RUNNING_SERVICES.put(service.serverPort, service);
Review Comment:
infact, if we introduce an object, easier to check for presence of timeline
server. we can take the hash and store. and .containsKey() should suffice. as
of now, we loop through all values.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]