loukey-lj commented on a change in pull request #2434:
URL: https://github.com/apache/hudi/pull/2434#discussion_r555807220
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
private static final Logger LOG =
LoggerFactory.getLogger(InstantGenerateOperator.class);
public static final String NAME = "InstantGenerateOperator";
-
+ private static final String UNDERLINE = "_";
private HoodieFlinkStreamer.Config cfg;
private HoodieFlinkWriteClient writeClient;
private SerializableConfiguration serializableHadoopConf;
private transient FileSystem fs;
private String latestInstant = "";
private List<String> latestInstantList = new ArrayList<>(1);
private transient ListState<String> latestInstantState;
- private List<StreamRecord> bufferedRecords = new LinkedList();
- private transient ListState<StreamRecord> recordsState;
private Integer retryTimes;
private Integer retryInterval;
+ private transient boolean isMain = false;
+ private transient volatile long batchSize = 0;
@Override
public void processElement(StreamRecord<HoodieRecord> streamRecord) throws
Exception {
if (streamRecord.getValue() != null) {
- bufferedRecords.add(streamRecord);
output.collect(streamRecord);
+ batchSize++;
}
}
@Override
public void open() throws Exception {
super.open();
+ isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
+
// get configs from runtimeContext
cfg = (HoodieFlinkStreamer.Config)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
- // retry times
- retryTimes = Integer.valueOf(cfg.blockRetryTime);
-
- // retry interval
- retryInterval = Integer.valueOf(cfg.blockRetryInterval);
-
// hadoopConf
serializableHadoopConf = new
SerializableConfiguration(StreamerUtil.getHadoopConf());
// Hadoop FileSystem
fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
- TaskContextSupplier taskContextSupplier = new
FlinkTaskContextSupplier(null);
+ if (isMain) {
+ // retry times
+ retryTimes = Integer.valueOf(cfg.blockRetryTime);
Review comment:
config have default value
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
##########
@@ -63,104 +63,159 @@
private static final Logger LOG =
LoggerFactory.getLogger(InstantGenerateOperator.class);
public static final String NAME = "InstantGenerateOperator";
-
+ private static final String UNDERLINE = "_";
private HoodieFlinkStreamer.Config cfg;
private HoodieFlinkWriteClient writeClient;
private SerializableConfiguration serializableHadoopConf;
private transient FileSystem fs;
private String latestInstant = "";
private List<String> latestInstantList = new ArrayList<>(1);
private transient ListState<String> latestInstantState;
- private List<StreamRecord> bufferedRecords = new LinkedList();
- private transient ListState<StreamRecord> recordsState;
private Integer retryTimes;
private Integer retryInterval;
+ private transient boolean isMain = false;
+ private transient volatile long batchSize = 0;
@Override
public void processElement(StreamRecord<HoodieRecord> streamRecord) throws
Exception {
if (streamRecord.getValue() != null) {
- bufferedRecords.add(streamRecord);
output.collect(streamRecord);
+ batchSize++;
}
}
@Override
public void open() throws Exception {
super.open();
+ isMain = getRuntimeContext().getIndexOfThisSubtask() == 0;
+
// get configs from runtimeContext
cfg = (HoodieFlinkStreamer.Config)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
- // retry times
- retryTimes = Integer.valueOf(cfg.blockRetryTime);
-
- // retry interval
- retryInterval = Integer.valueOf(cfg.blockRetryInterval);
-
// hadoopConf
serializableHadoopConf = new
SerializableConfiguration(StreamerUtil.getHadoopConf());
// Hadoop FileSystem
fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
- TaskContextSupplier taskContextSupplier = new
FlinkTaskContextSupplier(null);
+ if (isMain) {
+ // retry times
+ retryTimes = Integer.valueOf(cfg.blockRetryTime);
- // writeClient
- writeClient = new HoodieFlinkWriteClient(new
HoodieFlinkEngineContext(taskContextSupplier),
StreamerUtil.getHoodieClientConfig(cfg), true);
+ // retry interval
+ retryInterval = Integer.valueOf(cfg.blockRetryInterval);
Review comment:
config have default value
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]