yanghua commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r567513264
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/InsertBucket.java
##########
@@ -26,9 +26,9 @@
*/
public class InsertBucket implements Serializable {
- int bucketNumber;
+ public int bucketNumber;
Review comment:
ditto
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
##########
@@ -62,47 +61,14 @@ public FlinkInMemoryStateIndex(HoodieFlinkEngineContext
context, HoodieWriteConf
public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records,
HoodieEngineContext context,
HoodieTable<T,
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws
HoodieIndexException {
- return context.map(records, record -> {
- try {
- if (mapState.contains(record.getKey())) {
- record.unseal();
- record.setCurrentLocation(mapState.get(record.getKey()));
- record.seal();
- }
- } catch (Exception e) {
- LOG.error(String.format("Tag record location failed, key = %s, %s",
record.getRecordKey(), e.getMessage()));
- }
- return record;
- }, 0);
+ throw new UnsupportedOperationException("No need to tag location for
FlinkInMemoryStateIndex");
Review comment:
@wangxianghu Do you agree this?
##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -68,6 +70,9 @@ public static TypedProperties
appendKafkaProps(FlinkStreamerConfig config) {
}
public static TypedProperties getProps(FlinkStreamerConfig cfg) {
+ if (cfg.propsFilePath.equals("")) {
Review comment:
`String#isEmpty`
##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -250,4 +259,37 @@ public static void checkRequiredProperties(TypedProperties
props, List<String> c
checkPropNames.forEach(prop ->
Preconditions.checkState(!props.containsKey(prop), "Required property
" + prop + " is missing"));
}
+
+ /**
+ * Initialize the table if it does not exist.
+ *
+ * @param conf the configuration
+ * @throws IOException if errors happens when writing metadata
+ */
+ public static void initTable(Configuration conf) throws IOException {
+ final String basePath = conf.getString(FlinkOptions.PATH);
+ final org.apache.hadoop.conf.Configuration hadoopConf =
StreamerUtil.getHadoopConf();
+ // Hadoop FileSystem
+ try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) {
+ if (!fs.exists(new Path(basePath,
HoodieTableMetaClient.METAFOLDER_NAME))) {
+ HoodieTableMetaClient.initTableType(
+ hadoopConf,
+ basePath,
+ HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
+ conf.getString(FlinkOptions.TABLE_NAME),
+ "archived",
Review comment:
Extract it to be a constant.
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -249,7 +250,17 @@ public String getLastCompletedInstant(String tableType) {
public void deletePendingInstant(String tableType, String instant) {
HoodieFlinkTable<T> table = HoodieFlinkTable.create(config,
(HoodieFlinkEngineContext) context);
String commitType =
CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
- table.getMetaClient().getActiveTimeline()
- .deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED,
commitType, instant));
+ HoodieActiveTimeline activeTimeline =
table.getMetaClient().getActiveTimeline();
+ activeTimeline.deletePending(new
HoodieInstant(HoodieInstant.State.INFLIGHT, commitType, instant));
+ activeTimeline.deletePending(new
HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant));
+ }
+
+ public void transitionRequestedToInflight(String tableType, String
inFlightInstant) {
Review comment:
`transition` is a noum. `transform` sounds better? I know there is a
method `HoodieActiveTimeline#transitionRequestedToInflight`
##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -250,4 +259,37 @@ public static void checkRequiredProperties(TypedProperties
props, List<String> c
checkPropNames.forEach(prop ->
Preconditions.checkState(!props.containsKey(prop), "Required property
" + prop + " is missing"));
}
+
+ /**
+ * Initialize the table if it does not exist.
+ *
+ * @param conf the configuration
+ * @throws IOException if errors happens when writing metadata
+ */
+ public static void initTable(Configuration conf) throws IOException {
+ final String basePath = conf.getString(FlinkOptions.PATH);
+ final org.apache.hadoop.conf.Configuration hadoopConf =
StreamerUtil.getHadoopConf();
+ // Hadoop FileSystem
+ try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) {
+ if (!fs.exists(new Path(basePath,
HoodieTableMetaClient.METAFOLDER_NAME))) {
+ HoodieTableMetaClient.initTableType(
+ hadoopConf,
+ basePath,
+ HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
+ conf.getString(FlinkOptions.TABLE_NAME),
+ "archived",
+ conf.getString(FlinkOptions.PAYLOAD_CLASS),
+ 1);
+ LOG.info("Table initialized");
Review comment:
More information to be provided? e.g. `basePath`...?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java
##########
@@ -19,15 +19,16 @@
package org.apache.hudi.table.action.commit;
import java.io.Serializable;
+import java.util.Objects;
/**
* Helper class for a bucket's type (INSERT and UPDATE) and its file location.
*/
public class BucketInfo implements Serializable {
- BucketType bucketType;
- String fileIdPrefix;
- String partitionPath;
+ public BucketType bucketType;
Review comment:
Can we use `getter/setter` pattern instead of using public fields?
##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -250,4 +259,37 @@ public static void checkRequiredProperties(TypedProperties
props, List<String> c
checkPropNames.forEach(prop ->
Preconditions.checkState(!props.containsKey(prop), "Required property
" + prop + " is missing"));
}
+
+ /**
+ * Initialize the table if it does not exist.
+ *
+ * @param conf the configuration
+ * @throws IOException if errors happens when writing metadata
+ */
+ public static void initTable(Configuration conf) throws IOException {
+ final String basePath = conf.getString(FlinkOptions.PATH);
Review comment:
IMO, `FlinkOptions.BASE_PATH` is more readable?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]