danny0405 commented on a change in pull request #3516:
URL: https://github.com/apache/hudi/pull/3516#discussion_r693307276
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
##########
@@ -68,7 +79,7 @@
* <p>The output records should then shuffle by the recordKey and thus do
scalable write.
*/
public class BootstrapFunction<I, O extends HoodieRecord>
- extends ProcessFunction<I, O> {
+ extends AbstractStreamOperator<O> implements OneInputStreamOperator<I, O> {
Review comment:
BootstrapFunction -> BootstrapOperator
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
##########
@@ -193,22 +205,22 @@ protected void loadRecords(String partitionPath,
Collector<O> out) throws Except
}
for (HoodieKey hoodieKey : hoodieKeys) {
- out.collect((O) new IndexRecord(generateHoodieRecord(hoodieKey,
fileSlice)));
+ output.collect(new StreamRecord(new
IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))));
}
});
// load avro log records
List<String> logPaths = fileSlice.getLogFiles()
- // filter out crushed files
- .filter(logFile -> isValidFile(logFile.getFileStatus()))
- .map(logFile -> logFile.getPath().toString())
- .collect(toList());
+ // filter out crushed files
+ .filter(logFile -> isValidFile(logFile.getFileStatus()))
+ .map(logFile -> logFile.getPath().toString())
Review comment:
We can do that in following PR.
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
##########
@@ -79,65 +90,63 @@
protected transient org.apache.hadoop.conf.Configuration hadoopConf;
protected transient HoodieWriteConfig writeConfig;
- private GlobalAggregateManager aggregateManager;
-
+ private transient ListState<String> instantState;
private final Pattern pattern;
- private boolean alreadyBootstrap;
+ private String lastInstantTime;
+ private HoodieFlinkWriteClient writeClient;
+ private String actionType;
public BootstrapFunction(Configuration conf) {
this.conf = conf;
this.pattern =
Pattern.compile(conf.getString(FlinkOptions.INDEX_PARTITION_REGEX));
}
@Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- this.hadoopConf = StreamerUtil.getHadoopConf();
- this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
- this.hoodieTable = getTable();
- this.aggregateManager = ((StreamingRuntimeContext)
getRuntimeContext()).getGlobalAggregateManager();
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ lastInstantTime = this.writeClient.getLastPendingInstant(this.actionType);
+ instantState.update(Collections.singletonList(lastInstantTime));
}
@Override
- @SuppressWarnings("unchecked")
- public void processElement(I value, Context ctx, Collector<O> out) throws
Exception {
- if (!alreadyBootstrap) {
- String basePath = hoodieTable.getMetaClient().getBasePath();
- int taskID = getRuntimeContext().getIndexOfThisSubtask();
- LOG.info("Start loading records in table {} into the index state, taskId
= {}", basePath, taskID);
- for (String partitionPath :
FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, hadoopConf),
basePath)) {
- if (pattern.matcher(partitionPath).matches()) {
- loadRecords(partitionPath, out);
- }
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ ListStateDescriptor<String> instantStateDescriptor = new
ListStateDescriptor<>(
+ "instantStateDescriptor",
+ TypeInformation.of(new TypeHint<String>() {
+ })
Review comment:
Types.STRING()
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
##########
@@ -193,22 +205,22 @@ protected void loadRecords(String partitionPath,
Collector<O> out) throws Except
}
for (HoodieKey hoodieKey : hoodieKeys) {
- out.collect((O) new IndexRecord(generateHoodieRecord(hoodieKey,
fileSlice)));
+ output.collect(new StreamRecord(new
IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))));
}
});
// load avro log records
List<String> logPaths = fileSlice.getLogFiles()
- // filter out crushed files
- .filter(logFile -> isValidFile(logFile.getFileStatus()))
- .map(logFile -> logFile.getPath().toString())
- .collect(toList());
+ // filter out crushed files
+ .filter(logFile -> isValidFile(logFile.getFileStatus()))
+ .map(logFile -> logFile.getPath().toString())
Review comment:
We can promote the code to load the keys incrementally, say, the new
keys since last commit state instant.
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
##########
@@ -68,7 +79,7 @@
* <p>The output records should then shuffle by the recordKey and thus do
scalable write.
*/
public class BootstrapFunction<I, O extends HoodieRecord>
- extends ProcessFunction<I, O> {
+ extends AbstractStreamOperator<O> implements OneInputStreamOperator<I, O> {
Review comment:
Also fix the class doc, ditto ` BatchBootstrapFunction`.
##########
File path: hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
##########
@@ -238,6 +238,7 @@ public void testMergeOnReadWriteWithCompaction() throws
Exception {
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
conf.setString(FlinkOptions.TABLE_TYPE,
HoodieTableType.MERGE_ON_READ.name());
+ conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
Review comment:
Why we turn on the bootstrap.
##########
File path:
hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
##########
@@ -70,7 +70,7 @@ public CompactFunctionWrapper(Configuration conf) throws
Exception {
.build();
this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0,
environment);
this.conf = conf;
- this.functionInitializationContext = new
MockFunctionInitializationContext();
+ this.functionInitializationContext = new MockStateInitializationContext();
}
Review comment:
Maybe remove the `this.functionInitializationContext` because it is
useless.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]