xushiyan commented on code in PR #5269:
URL: https://github.com/apache/hudi/pull/5269#discussion_r993354807


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseCompactor.java:
##########
@@ -39,8 +39,9 @@ public BaseCompactor(BaseHoodieWriteClient<T, I, K, O> 
compactionClient) {
 
   public abstract void compact(HoodieInstant instant) throws IOException;
 
-  public void updateWriteClient(BaseHoodieWriteClient<T, I, K, O> writeClient) 
{
-    this.compactionClient = writeClient;
+  public void close() {

Review Comment:
   ditto



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseClusterer.java:
##########
@@ -40,16 +40,15 @@ public BaseClusterer(BaseHoodieWriteClient<T, I, K, O> 
clusteringClient) {
 
   /**
    * Run clustering for the instant.
+   *
    * @param instant
    * @throws IOException
    */
   public abstract void cluster(HoodieInstant instant) throws IOException;
 
-  /**
-   * Update the write client used by async clustering.
-   * @param writeClient
-   */
-  public void updateWriteClient(BaseHoodieWriteClient<T, I, K, O> writeClient) 
{
-    this.clusteringClient = writeClient;
+  public void close() {
+    if (clusteringClient != null) {
+      clusteringClient.close();
+    }

Review Comment:
   let's implement AutoCloseable when we have this close() API



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java:
##########
@@ -78,13 +81,26 @@ protected Pair<CompletableFuture, ExecutorService> 
startService() {
 
         while (!isShutdownRequested()) {
           final HoodieInstant instant = fetchNextAsyncServiceInstant();
-
           if (null != instant) {
             LOG.info("Starting Compaction for instant " + instant);
+            synchronized (writeConfigUpdateLock) {

Review Comment:
   from my understanding of previous comments, this still allows async service 
to receive write config updates? if we think about async service coupling with 
a table service client and being immutable, we shouldn't need this sort of 
lock. we just create new async service at deltasync level, and freeze write 
config for the service internally.



##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java:
##########
@@ -190,7 +192,49 @@ public static HoodieWriteConfig createHoodieConfig(String 
schemaStr, String base
 
   public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jssc, 
String schemaStr, String basePath,
                                                        String tblName, 
Map<String, String> parameters) {
-    return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), 
createHoodieConfig(schemaStr, basePath, tblName, parameters));
+    return createHoodieClient(jssc, schemaStr, basePath, tblName, parameters, 
Option.empty());
+  }
+
+  public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jssc, 
String schemaStr, String basePath,
+                                                       String tblName, 
Map<String, String> parameters, Option<EmbeddedTimelineServiceHandler> 
embeddedTimelineServiceHandler) {
+    HoodieWriteConfig writeConfig = createHoodieConfig(schemaStr, basePath, 
tblName, parameters);
+    if (embeddedTimelineServiceHandler.isPresent())  {
+      embeddedTimelineServiceHandler.get().onInstantiation(writeConfig);
+    }
+    return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), 
writeConfig, embeddedTimelineServiceHandler.isPresent()
+        ? embeddedTimelineServiceHandler.get().getEmbeddedTimelineService() : 
Option.empty());
+  }
+
+  /**
+   * For spark structured streaming ingestion, embedded timeline service is 
instantiated externally and re-used across various write client instantiations 
(regular writer, table services).
+   * This class helps in coordinating the instantiation of embedded timeline 
service and exposes the singleton instance.
+   */
+  public static class EmbeddedTimelineServiceHandler {
+
+    private Option<EmbeddedTimelineService> embeddedTimelineService = 
Option.empty();
+    private final JavaSparkContext jssc;
+
+    public EmbeddedTimelineServiceHandler(JavaSparkContext jssc) {
+      this.jssc = jssc;
+    }
+
+    public void onInstantiation(HoodieWriteConfig writeConfig) {
+      if (writeConfig.isEmbeddedTimelineServerEnabled()) {
+        if (!embeddedTimelineService.isPresent()) {
+          try {
+            embeddedTimelineService = 
EmbeddedTimelineServerHelper.createEmbeddedTimelineService(new 
HoodieSparkEngineContext(jssc), writeConfig);
+          } catch (IOException e) {
+            e.printStackTrace();

Review Comment:
   avoid e.print



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to