This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e3675fe9b0 [HUDI-4372] Enable matadata table by default for flink 
(#6066)
e3675fe9b0 is described below

commit e3675fe9b014f0d6c24caeaa23578a54e194e92c
Author: Danny Chan <[email protected]>
AuthorDate: Wed Jul 20 16:10:19 2022 +0800

    [HUDI-4372] Enable matadata table by default for flink (#6066)
---
 .../apache/hudi/client/HoodieFlinkWriteClient.java | 36 +++++++++++++++++-----
 .../hudi/common/config/HoodieMetadataConfig.java   |  2 +-
 .../apache/hudi/configuration/FlinkOptions.java    |  4 +--
 .../hudi/sink/StreamWriteOperatorCoordinator.java  | 18 ++++++-----
 .../hudi/source/stats/ExpressionEvaluator.java     |  4 +--
 .../sink/compact/ITTestHoodieFlinkCompactor.java   |  4 +++
 .../apache/hudi/table/format/TestInputFormat.java  |  2 ++
 .../org/apache/hudi/utils/TestCompactionUtil.java  |  6 ++++
 8 files changed, 57 insertions(+), 19 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index dce9245860..80bcf3adf3 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -96,7 +96,7 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
   /**
    * Cached metadata writer for coordinator to reuse for each commit.
    */
-  private Option<HoodieBackedTableMetadataWriter> metadataWriterOption = 
Option.empty();
+  private HoodieBackedTableMetadataWriter metadataWriter;
 
   public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig 
writeConfig) {
     super(context, writeConfig, FlinkUpgradeDowngradeHelper.getInstance());
@@ -264,10 +264,17 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
 
   @Override
   protected void writeTableMetadata(HoodieTable table, String instantTime, 
String actionType, HoodieCommitMetadata metadata) {
-    this.metadataWriterOption.ifPresent(w -> {
-      w.initTableMetadata(); // refresh the timeline
-      w.update(metadata, instantTime, 
getHoodieTable().isTableServiceAction(actionType));
-    });
+    if (this.metadataWriter == null) {
+      initMetadataWriter();
+    }
+    // refresh the timeline
+
+    // Note: the data meta client is not refreshed currently, some code path
+    // relies on the meta client for resolving the latest data schema,
+    // the schema expects to be immutable for SQL jobs but may be not for 
non-SQL
+    // jobs.
+    this.metadataWriter.initTableMetadata();
+    this.metadataWriter.update(metadata, instantTime, 
getHoodieTable().isTableServiceAction(actionType));
   }
 
   /**
@@ -275,9 +282,24 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
    * from the filesystem if it does not exist.
    */
   public void initMetadataWriter() {
-    HoodieBackedTableMetadataWriter metadataWriter = 
(HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create(
+    this.metadataWriter = (HoodieBackedTableMetadataWriter) 
FlinkHoodieBackedTableMetadataWriter.create(
         FlinkClientUtil.getHadoopConf(), this.config, 
HoodieFlinkEngineContext.DEFAULT);
-    this.metadataWriterOption = Option.of(metadataWriter);
+  }
+
+  /**
+   * Initialized the metadata table on start up, should only be called once on 
driver.
+   */
+  public void initMetadataTable() {
+    HoodieFlinkTable<?> table = getHoodieTable();
+    if (config.isMetadataTableEnabled()) {
+      // initialize the metadata table path
+      initMetadataWriter();
+      // clean the obsolete index stats
+      table.deleteMetadataIndexIfNecessary();
+    } else {
+      // delete the metadata table if it was enabled but is now disabled
+      table.maybeDeleteMetadataTable();
+    }
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 14a055cb17..2cd08b9ae9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -441,9 +441,9 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
 
     private boolean getDefaultMetadataEnable(EngineType engineType) {
       switch (engineType) {
+        case FLINK:
         case SPARK:
           return ENABLE.defaultValue();
-        case FLINK:
         case JAVA:
           return false;
         default:
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index b1a3372e08..7425540de9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -103,8 +103,8 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<Boolean> METADATA_ENABLED = ConfigOptions
       .key("metadata.enabled")
       .booleanType()
-      .defaultValue(false)
-      .withDescription("Enable the internal metadata table which serves table 
metadata like level file listings, default false");
+      .defaultValue(true)
+      .withDescription("Enable the internal metadata table which serves table 
metadata like level file listings, default enabled");
 
   public static final ConfigOption<Integer> METADATA_COMPACTION_DELTA_COMMITS 
= ConfigOptions
       .key("metadata.compaction.delta_commits")
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index b726b02cad..d5ca307a00 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -51,6 +51,7 @@ import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
@@ -181,8 +182,10 @@ public class StreamWriteOperatorCoordinator
     this.gateways = new SubtaskGateway[this.parallelism];
     // init table, create if not exists.
     this.metaClient = initTableIfNotExists(this.conf);
+    this.ckpMetadata = initCkpMetadata(this.metaClient);
     // the write client must create after the table creation
     this.writeClient = StreamerUtil.createWriteClient(conf);
+    initMetadataTable(this.writeClient);
     this.tableState = TableState.create(conf);
     // start the executor
     this.executor = NonThrownExecutor.builder(LOG)
@@ -192,11 +195,6 @@ public class StreamWriteOperatorCoordinator
     if (tableState.syncHive) {
       initHiveSync();
     }
-    if (tableState.syncMetadata) {
-      initMetadataSync();
-    }
-    this.ckpMetadata = CkpMetadata.getInstance(this.metaClient.getFs(), 
metaClient.getBasePath());
-    this.ckpMetadata.bootstrap(this.metaClient);
   }
 
   @Override
@@ -352,8 +350,14 @@ public class StreamWriteOperatorCoordinator
     hiveSyncContext.hiveSyncTool().syncHoodieTable();
   }
 
-  private void initMetadataSync() {
-    this.writeClient.initMetadataWriter();
+  private static void initMetadataTable(HoodieFlinkWriteClient<?> writeClient) 
{
+    writeClient.initMetadataTable();
+  }
+
+  private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient) 
throws IOException {
+    CkpMetadata ckpMetadata = CkpMetadata.getInstance(metaClient.getFs(), 
metaClient.getBasePath());
+    ckpMetadata.bootstrap(metaClient);
+    return ckpMetadata;
   }
 
   private void reset() {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java
index a0162856fa..efe0275267 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java
@@ -87,8 +87,8 @@ public class ExpressionEvaluator {
      * 2. bind the field reference;
      * 3. bind the column stats.
      *
-     * <p>Normalize the expression to simplify the following decision logic:
-     * always put the literal expression in the right.
+     * <p>Normalize the expression to simplify the subsequent decision logic:
+     * always put the literal expression in the RHS.
      */
     public static Evaluator bindCall(CallExpression call, RowData indexRow, 
RowType.RowField[] queryFields) {
       FunctionDefinition funDef = call.getFunctionDefinition();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
index 7e3b055add..5c763e055f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -102,6 +102,7 @@ public class ITTestHoodieFlinkCompactor {
     tableEnv.getConfig().getConfiguration()
         
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
     Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
     options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
     options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
     options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
@@ -214,6 +215,7 @@ public class ITTestHoodieFlinkCompactor {
     tableEnv.getConfig().getConfiguration()
         
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
     Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
     options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
     options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
     options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
@@ -222,6 +224,7 @@ public class ITTestHoodieFlinkCompactor {
     tableEnv.executeSql(hoodieTableDDL);
     tableEnv.executeSql(TestSQL.INSERT_T1).await();
 
+    // wait for the asynchronous commit to finish
     TimeUnit.SECONDS.sleep(3);
 
     StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -247,6 +250,7 @@ public class ITTestHoodieFlinkCompactor {
         + "('id13','Jenny',72,TIMESTAMP '1970-01-01 00:00:10','par5')";
     tableEnv.executeSql(insertT1ForNewPartition).await();
 
+    // wait for the asynchronous commit to finish
     TimeUnit.SECONDS.sleep(3);
 
     compactionInstantTimeList.add(scheduleCompactionPlan(metaClient, 
writeClient));
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 3bd00634f7..b663a4af3e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -480,6 +480,8 @@ public class TestInputFormat {
     options.put(FlinkOptions.ARCHIVE_MIN_COMMITS.key(), "3");
     options.put(FlinkOptions.ARCHIVE_MAX_COMMITS.key(), "4");
     options.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "2");
+    // disable the metadata table to make the archiving behavior deterministic
+    options.put(FlinkOptions.METADATA_ENABLED.key(), "false");
     options.put("hoodie.commits.archival.batch", "1");
     beforeEach(HoodieTableType.COPY_ON_WRITE, options);
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
index 9559b8c8c5..87c8379d6a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
@@ -29,6 +29,7 @@ import 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.util.CompactionUtil;
 import org.apache.hudi.util.FlinkTables;
@@ -77,6 +78,11 @@ public class TestCompactionUtil {
 
     this.table = FlinkTables.createTable(conf);
     this.metaClient = table.getMetaClient();
+    // initialize the metadata table path
+    if (conf.getBoolean(FlinkOptions.METADATA_ENABLED)) {
+      FlinkHoodieBackedTableMetadataWriter.create(table.getHadoopConf(), 
table.getConfig(),
+          table.getContext(), Option.empty(), Option.empty());
+    }
   }
 
   @Test

Reply via email to