loukey-lj commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r556286285
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -71,22 +72,25 @@
private String latestInstant = "";
private List<String> latestInstantList = new ArrayList<>(1);
private transient ListState<String> latestInstantState;
- private List<StreamRecord> bufferedRecords = new LinkedList();
- private transient ListState<StreamRecord> recordsState;
private Integer retryTimes;
private Integer retryInterval;
+ private static final String UNDERLINE = "_";
+ private static final String INSTANT_GENERATE_FOLDER_NAME =
"instant_generate_tmp";
+ private transient boolean isMain = false;
+ private transient volatile long batchSize = 0L;
@Override
public void processElement(StreamRecord<HoodieRecord> streamRecord) throws
Exception {
if (streamRecord.getValue() != null) {
- bufferedRecords.add(streamRecord);
output.collect(streamRecord);
+ batchSize++;
}
}
@Override
public void open() throws Exception {
super.open();
+ isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
Review comment:
called in open and initializeState method, initializeState method
start before open method . So I removed the code in open method.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]