yanghua commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r555703829



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -71,6 +71,7 @@
 
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = 
LogManager.getLogger(HoodieTableMetaClient.class);
+  public static final String INSTANT_GENERATE_FOLDER_NAME = "instant_generate";

Review comment:
       Renaming to `instant_generate_tmp` looks better?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -202,7 +203,7 @@ public String getTempFolderPath() {
 
   /**
    * Returns Marker folder path.
-   * 

Review comment:
       unnecessary change, please revert?

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
 
   private static final Logger LOG = 
LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
-
+  private static final String UNDERLINE = "_";
   private HoodieFlinkStreamer.Config cfg;
   private HoodieFlinkWriteClient writeClient;
   private SerializableConfiguration serializableHadoopConf;
   private transient FileSystem fs;
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private transient boolean isMain = false;

Review comment:
       `realGenerator`?

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
 
   private static final Logger LOG = 
LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
-
+  private static final String UNDERLINE = "_";
   private HoodieFlinkStreamer.Config cfg;
   private HoodieFlinkWriteClient writeClient;
   private SerializableConfiguration serializableHadoopConf;
   private transient FileSystem fs;
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private transient boolean isMain = false;
+  private transient volatile long batchSize = 0;
 
   @Override
   public void processElement(StreamRecord<HoodieRecord> streamRecord) throws 
Exception {
     if (streamRecord.getValue() != null) {
-      bufferedRecords.add(streamRecord);
       output.collect(streamRecord);
+      batchSize++;
     }
   }
 
   @Override
   public void open() throws Exception {
     super.open();
+    isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
+
     // get configs from runtimeContext
     cfg = (HoodieFlinkStreamer.Config) 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
 
-    // retry times
-    retryTimes = Integer.valueOf(cfg.blockRetryTime);
-
-    // retry interval
-    retryInterval = Integer.valueOf(cfg.blockRetryInterval);
-
     // hadoopConf
     serializableHadoopConf = new 
SerializableConfiguration(StreamerUtil.getHadoopConf());
 
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new 
FlinkTaskContextSupplier(null);
+    if (isMain) {
+      // retry times
+      retryTimes = Integer.valueOf(cfg.blockRetryTime);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new 
HoodieFlinkEngineContext(taskContextSupplier), 
StreamerUtil.getHoodieClientConfig(cfg), true);
+      // retry interval
+      retryInterval = Integer.valueOf(cfg.blockRetryInterval);
 
-    // init table, create it if not exists.
-    initTable();
+      TaskContextSupplier taskContextSupplier = new 
FlinkTaskContextSupplier(null);
+
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new 
HoodieFlinkEngineContext(taskContextSupplier), 
StreamerUtil.getHoodieClientConfig(cfg), true);
+
+      // init table, create it if not exists.
+      initTable();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then 
throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
+    int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+    String instantGenerateInfoFileName = indexOfThisSubtask + UNDERLINE + 
checkpointId + UNDERLINE + batchSize;

Review comment:
       Let's use `String.format()`?

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
 
   private static final Logger LOG = 
LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
-
+  private static final String UNDERLINE = "_";
   private HoodieFlinkStreamer.Config cfg;
   private HoodieFlinkWriteClient writeClient;
   private SerializableConfiguration serializableHadoopConf;
   private transient FileSystem fs;
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private transient boolean isMain = false;
+  private transient volatile long batchSize = 0;
 
   @Override
   public void processElement(StreamRecord<HoodieRecord> streamRecord) throws 
Exception {
     if (streamRecord.getValue() != null) {
-      bufferedRecords.add(streamRecord);
       output.collect(streamRecord);
+      batchSize++;
     }
   }
 
   @Override
   public void open() throws Exception {
     super.open();
+    isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
+
     // get configs from runtimeContext
     cfg = (HoodieFlinkStreamer.Config) 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
 
-    // retry times
-    retryTimes = Integer.valueOf(cfg.blockRetryTime);
-
-    // retry interval
-    retryInterval = Integer.valueOf(cfg.blockRetryInterval);
-
     // hadoopConf
     serializableHadoopConf = new 
SerializableConfiguration(StreamerUtil.getHadoopConf());
 
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new 
FlinkTaskContextSupplier(null);
+    if (isMain) {
+      // retry times
+      retryTimes = Integer.valueOf(cfg.blockRetryTime);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new 
HoodieFlinkEngineContext(taskContextSupplier), 
StreamerUtil.getHoodieClientConfig(cfg), true);
+      // retry interval
+      retryInterval = Integer.valueOf(cfg.blockRetryInterval);

Review comment:
       ditto?

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
 
   private static final Logger LOG = 
LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
-
+  private static final String UNDERLINE = "_";
   private HoodieFlinkStreamer.Config cfg;
   private HoodieFlinkWriteClient writeClient;
   private SerializableConfiguration serializableHadoopConf;
   private transient FileSystem fs;
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private transient boolean isMain = false;
+  private transient volatile long batchSize = 0;

Review comment:
       `recordCounter`?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -272,7 +273,7 @@ public HoodieWrapperFileSystem getFs() {
 
   /**
    * Return raw file-system.
-   * 

Review comment:
       ditto

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -420,6 +421,12 @@ public static HoodieTableMetaClient 
initTableAndGetMetaClient(Configuration hado
       }
     }
 
+    // Always create instantGenerateFolder which is needed for 
InstantGenerateOperator
+    final Path instantGenerateFolder = new Path(basePath, 
HoodieTableMetaClient.INSTANT_GENERATE_FOLDER_NAME);

Review comment:
       @wangxianghu Do we have a better place to put this change? It's common 
package.

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -18,6 +18,18 @@
 
 package org.apache.hudi.operator;
 
+import org.apache.flink.api.common.state.ListState;

Review comment:
       Let revert unnecessary changes? Pay attention to the import order.

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
 
   private static final Logger LOG = 
LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
-
+  private static final String UNDERLINE = "_";
   private HoodieFlinkStreamer.Config cfg;
   private HoodieFlinkWriteClient writeClient;
   private SerializableConfiguration serializableHadoopConf;
   private transient FileSystem fs;
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private transient boolean isMain = false;
+  private transient volatile long batchSize = 0;
 
   @Override
   public void processElement(StreamRecord<HoodieRecord> streamRecord) throws 
Exception {
     if (streamRecord.getValue() != null) {
-      bufferedRecords.add(streamRecord);
       output.collect(streamRecord);
+      batchSize++;
     }
   }
 
   @Override
   public void open() throws Exception {
     super.open();
+    isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
+
     // get configs from runtimeContext
     cfg = (HoodieFlinkStreamer.Config) 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
 
-    // retry times
-    retryTimes = Integer.valueOf(cfg.blockRetryTime);
-
-    // retry interval
-    retryInterval = Integer.valueOf(cfg.blockRetryInterval);
-
     // hadoopConf
     serializableHadoopConf = new 
SerializableConfiguration(StreamerUtil.getHadoopConf());
 
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new 
FlinkTaskContextSupplier(null);
+    if (isMain) {
+      // retry times
+      retryTimes = Integer.valueOf(cfg.blockRetryTime);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new 
HoodieFlinkEngineContext(taskContextSupplier), 
StreamerUtil.getHoodieClientConfig(cfg), true);
+      // retry interval
+      retryInterval = Integer.valueOf(cfg.blockRetryInterval);
 
-    // init table, create it if not exists.
-    initTable();
+      TaskContextSupplier taskContextSupplier = new 
FlinkTaskContextSupplier(null);
+
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new 
HoodieFlinkEngineContext(taskContextSupplier), 
StreamerUtil.getHoodieClientConfig(cfg), true);
+
+      // init table, create it if not exists.
+      initTable();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then 
throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
+    int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+    String instantGenerateInfoFileName = indexOfThisSubtask + UNDERLINE + 
checkpointId + UNDERLINE + batchSize;
+    Path path = new 
Path(HoodieTableMetaClient.INSTANT_GENERATE_FOLDER_NAME,instantGenerateInfoFileName);
+    // mk generate file by each subtask
+    fs.create(path,true);
+    LOG.info("subtask [{}] at checkpoint [{}] created generate file 
[{}]",indexOfThisSubtask,checkpointId,instantGenerateInfoFileName);
+    if (isMain) {
+      boolean hasData = generateFilePasre(checkpointId);

Review comment:
       `receivedDataInCurrentCP`?

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
 
   private static final Logger LOG = 
LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
-
+  private static final String UNDERLINE = "_";
   private HoodieFlinkStreamer.Config cfg;
   private HoodieFlinkWriteClient writeClient;
   private SerializableConfiguration serializableHadoopConf;
   private transient FileSystem fs;
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private transient boolean isMain = false;
+  private transient volatile long batchSize = 0;
 
   @Override
   public void processElement(StreamRecord<HoodieRecord> streamRecord) throws 
Exception {
     if (streamRecord.getValue() != null) {
-      bufferedRecords.add(streamRecord);
       output.collect(streamRecord);
+      batchSize++;
     }
   }
 
   @Override
   public void open() throws Exception {
     super.open();
+    isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
+
     // get configs from runtimeContext
     cfg = (HoodieFlinkStreamer.Config) 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
 
-    // retry times
-    retryTimes = Integer.valueOf(cfg.blockRetryTime);
-
-    // retry interval
-    retryInterval = Integer.valueOf(cfg.blockRetryInterval);
-
     // hadoopConf
     serializableHadoopConf = new 
SerializableConfiguration(StreamerUtil.getHadoopConf());
 
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new 
FlinkTaskContextSupplier(null);
+    if (isMain) {
+      // retry times
+      retryTimes = Integer.valueOf(cfg.blockRetryTime);

Review comment:
       We may need to verify the config opinion to see if it's valuable?

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
 
   private static final Logger LOG = 
LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
-
+  private static final String UNDERLINE = "_";

Review comment:
       `DELIMITER` sounds better?

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
 
   private static final Logger LOG = 
LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
-
+  private static final String UNDERLINE = "_";
   private HoodieFlinkStreamer.Config cfg;
   private HoodieFlinkWriteClient writeClient;
   private SerializableConfiguration serializableHadoopConf;
   private transient FileSystem fs;
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private transient boolean isMain = false;
+  private transient volatile long batchSize = 0;
 
   @Override
   public void processElement(StreamRecord<HoodieRecord> streamRecord) throws 
Exception {
     if (streamRecord.getValue() != null) {
-      bufferedRecords.add(streamRecord);
       output.collect(streamRecord);
+      batchSize++;
     }
   }
 
   @Override
   public void open() throws Exception {
     super.open();
+    isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
+
     // get configs from runtimeContext
     cfg = (HoodieFlinkStreamer.Config) 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
 
-    // retry times
-    retryTimes = Integer.valueOf(cfg.blockRetryTime);
-
-    // retry interval
-    retryInterval = Integer.valueOf(cfg.blockRetryInterval);
-
     // hadoopConf
     serializableHadoopConf = new 
SerializableConfiguration(StreamerUtil.getHadoopConf());
 
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new 
FlinkTaskContextSupplier(null);
+    if (isMain) {
+      // retry times
+      retryTimes = Integer.valueOf(cfg.blockRetryTime);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new 
HoodieFlinkEngineContext(taskContextSupplier), 
StreamerUtil.getHoodieClientConfig(cfg), true);
+      // retry interval
+      retryInterval = Integer.valueOf(cfg.blockRetryInterval);
 
-    // init table, create it if not exists.
-    initTable();
+      TaskContextSupplier taskContextSupplier = new 
FlinkTaskContextSupplier(null);
+
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new 
HoodieFlinkEngineContext(taskContextSupplier), 
StreamerUtil.getHoodieClientConfig(cfg), true);
+
+      // init table, create it if not exists.
+      initTable();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then 
throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
+    int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+    String instantGenerateInfoFileName = indexOfThisSubtask + UNDERLINE + 
checkpointId + UNDERLINE + batchSize;
+    Path path = new 
Path(HoodieTableMetaClient.INSTANT_GENERATE_FOLDER_NAME,instantGenerateInfoFileName);
+    // mk generate file by each subtask
+    fs.create(path,true);
+    LOG.info("subtask [{}] at checkpoint [{}] created generate file 
[{}]",indexOfThisSubtask,checkpointId,instantGenerateInfoFileName);
+    if (isMain) {
+      boolean hasData = generateFilePasre(checkpointId);
+      // check whether the last instant is completed, if not, wait 10s and 
then throws an exception
+      if (!StringUtils.isNullOrEmpty(latestInstant)) {
+        doCheck();
+        // last instant completed, set it empty
+        latestInstant = "";
+      }
+
+      // no data no new instant
+      if (hasData) {
+        latestInstant = startNewInstant(checkpointId);
+      }
+    }
+
+  }
+
+  private boolean generateFilePasre(long checkpointId) throws 
InterruptedException, IOException {

Review comment:
       `checkReceivedData`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to