loukey-lj commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r556286285



##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -71,22 +72,25 @@
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private static final String UNDERLINE = "_";
+  private static final String INSTANT_GENERATE_FOLDER_NAME = 
"instant_generate_tmp";
+  private transient boolean isMain = false;
+  private transient volatile long batchSize = 0L;
 
   @Override
   public void processElement(StreamRecord<HoodieRecord> streamRecord) throws 
Exception {
     if (streamRecord.getValue() != null) {
-      bufferedRecords.add(streamRecord);
       output.collect(streamRecord);
+      batchSize++;
     }
   }
 
   @Override
   public void open() throws Exception {
     super.open();
+    isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;

Review comment:
       called in open and  initializeState method,  initializeState method 
start before open method . So I removed the code in open method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to