This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new b87959cec529 fix: Cherry pick flink bug fixes for release 0.14.2 and 
0.15.1 (#17782)
b87959cec529 is described below

commit b87959cec52911512932398ac02517055db47001
Author: Shuo Cheng <[email protected]>
AuthorDate: Tue Jan 6 11:24:12 2026 +0800

    fix: Cherry pick flink bug fixes for release 0.14.2 and 0.15.1 (#17782)
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   7 +
 .../hudi/table/marker/WriteMarkersFactory.java     |   2 +-
 .../hudi/table/marker/TestWriteMarkersFactory.java |  17 ++-
 .../java/org/apache/hudi/avro/AvroSchemaUtils.java |  24 ++++
 .../hudi/sink/clustering/ClusteringOperator.java   |   6 +-
 .../org/apache/hudi/table/HoodieTableSource.java   |   8 +-
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java |  30 +++++
 .../org/apache/hudi/sink/TestWriteMergeOnRead.java |  31 +++++
 .../hudi/sink/TestWriteMergeOnReadWithCompact.java |   6 +
 .../sink/cluster/ITTestHoodieFlinkClustering.java  | 141 ++++++++++++++-------
 .../apache/hudi/table/ITTestHoodieDataSource.java  |  17 +++
 .../apache/hudi/utils/TestFlinkWriteClients.java   |  35 +++++
 12 files changed, 276 insertions(+), 48 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index a369b748ee26..2419071d41de 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -48,6 +48,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.HoodieRecordUtils;
@@ -1415,6 +1416,12 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getBoolean(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED);
   }
 
+  public boolean isRemoteViewStorageType() {
+    FileSystemViewStorageType storageType = 
getViewStorageConfig().getStorageType();
+    return storageType == FileSystemViewStorageType.REMOTE_ONLY
+        || storageType == FileSystemViewStorageType.REMOTE_FIRST;
+  }
+
   public int getEmbeddedTimelineServerPort() {
     return 
Integer.parseInt(getStringOrDefault(EMBEDDED_TIMELINE_SERVER_PORT_NUM));
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java
index c2317bf9bf89..14f0046fa027 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java
@@ -46,7 +46,7 @@ public class WriteMarkersFactory {
       case DIRECT:
         return new DirectWriteMarkers(table, instantTime);
       case TIMELINE_SERVER_BASED:
-        if (!table.getConfig().isEmbeddedTimelineServerEnabled()) {
+        if (!table.getConfig().isEmbeddedTimelineServerEnabled() && 
!table.getConfig().isRemoteViewStorageType()) {
           LOG.warn("Timeline-server-based markers are configured as the marker 
type "
               + "but embedded timeline server is not enabled.  Falling back to 
direct markers.");
           return new DirectWriteMarkers(table, instantTime);
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersFactory.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersFactory.java
index 3bddf709b881..4a974783ea89 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersFactory.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersFactory.java
@@ -90,6 +90,13 @@ public class TestWriteMarkersFactory extends 
HoodieCommonTestHarness {
         DirectWriteMarkers.class);
   }
 
+  @Test
+  public void testTimelineServerBasedMarkersWithRemoteViewStorageType() {
+    // Fallback to direct markers should happen
+    testWriteMarkersFactory(
+        MarkerType.TIMELINE_SERVER_BASED, NON_HDFS_BASE_PATH, false, true, 
TimelineServerBasedWriteMarkers.class);
+  }
+
   @Test
   public void testTimelineServerBasedMarkersWithHDFS() {
     // Fallback to direct markers should happen
@@ -98,13 +105,21 @@ public class TestWriteMarkersFactory extends 
HoodieCommonTestHarness {
         DirectWriteMarkers.class);
   }
 
+  private void testWriteMarkersFactory(
+      MarkerType markerTypeConfig, String basePath, boolean 
isTimelineServerEnabled, Class<?> expectedWriteMarkersClass) {
+    testWriteMarkersFactory(
+        markerTypeConfig, basePath, isTimelineServerEnabled, false, 
expectedWriteMarkersClass);
+  }
+
   private void testWriteMarkersFactory(
       MarkerType markerTypeConfig, String basePath, boolean 
isTimelineServerEnabled,
-      Class<?> expectedWriteMarkersClass) {
+      boolean isRemoteViewStorageType, Class<?> expectedWriteMarkersClass) {
     String instantTime = "001";
     Mockito.when(table.getConfig()).thenReturn(writeConfig);
     Mockito.when(writeConfig.isEmbeddedTimelineServerEnabled())
         .thenReturn(isTimelineServerEnabled);
+    Mockito.when(writeConfig.isRemoteViewStorageType())
+        .thenReturn(isRemoteViewStorageType);
     Mockito.when(table.getMetaClient()).thenReturn(metaClient);
     Mockito.when(metaClient.getStorage()).thenReturn(storage);
     Mockito.when(storage.getFileSystem()).thenReturn(fileSystem);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index 2e5093390e4b..f3d9897c64a1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -23,6 +23,9 @@ import org.apache.hudi.exception.InvalidUnionTypeException;
 import org.apache.hudi.exception.MissingSchemaFieldException;
 import org.apache.hudi.exception.SchemaBackwardsCompatibilityException;
 import org.apache.hudi.exception.SchemaCompatibilityException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.action.TableChanges;
+import org.apache.hudi.internal.schema.utils.SchemaChangeUtils;
 
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaCompatibility;
@@ -37,7 +40,9 @@ import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.util.CollectionUtils.reduce;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static 
org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter.convert;
 
 /**
  * Utils for Avro Schema.
@@ -447,4 +452,23 @@ public class AvroSchemaUtils {
   public static String createSchemaErrorString(String errorMessage, Schema 
writerSchema, Schema tableSchema) {
     return String.format("%s\nwriterSchema: %s\ntableSchema: %s", 
errorMessage, writerSchema, tableSchema);
   }
+
+  /**
+   * Create a new schema by force changing all the fields as nullable.
+   *
+   * @param schema original schema
+   * @return a new schema with all the fields updated as nullable.
+   */
+  public static Schema asNullable(Schema schema) {
+    List<String> filterCols = schema.getFields().stream()
+        .filter(f -> 
!isNullable(f.schema())).map(Schema.Field::name).collect(Collectors.toList());
+    if (filterCols.isEmpty()) {
+      return schema;
+    }
+    InternalSchema internalSchema = convert(schema);
+    TableChanges.ColumnUpdateChange schemaChange = 
TableChanges.ColumnUpdateChange.get(internalSchema);
+    schemaChange = reduce(filterCols, schemaChange,
+        (change, field) -> change.updateColumnNullability(field, true));
+    return convert(SchemaChangeUtils.applyTableChanges2Schema(internalSchema, 
schemaChange), schema.getFullName());
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index 2e1bfd604d70..ca1a23f4e116 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -20,6 +20,7 @@ package org.apache.hudi.sink.clustering;
 
 import org.apache.hudi.adapter.MaskingOutputAdapter;
 import org.apache.hudi.adapter.Utils;
+import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
@@ -164,7 +165,10 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
     this.table = writeClient.getHoodieTable();
 
     this.schema = AvroSchemaConverter.convertToSchema(rowType);
-    this.readerSchema = this.schema;
+    // Since there exists discrepancies between flink and spark dealing with 
nullability of primary key field,
+    // and there may be some files written by spark, force update schema as 
nullable to make sure clustering
+    // scan successfully without schema validating exception.
+    this.readerSchema = AvroSchemaUtils.asNullable(schema);
     this.requiredPos = getRequiredPositions();
 
     this.avroToRowDataConverter = 
AvroToRowDataConverters.createRowConverter(rowType);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index ab39bce3d7f0..7a397bf45c64 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -71,6 +71,7 @@ import org.apache.avro.Schema;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -242,7 +243,12 @@ public class HoodieTableSource implements
     } else if (OptionsResolver.isAppendMode(conf)) {
       return source.partitionCustom(new 
StreamReadAppendPartitioner(conf.getInteger(FlinkOptions.READ_TASKS)), new 
StreamReadAppendKeySelector());
     } else {
-      return source.keyBy(MergeOnReadInputSplit::getFileId);
+      return source.keyBy(new KeySelector<MergeOnReadInputSplit, String>() {
+        @Override
+        public String getKey(MergeOnReadInputSplit split) throws Exception {
+          return split.getFileId();
+        }
+      });
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index bd6da22965d0..040297e4e1f4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -23,6 +23,8 @@ import 
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.config.HoodieCleanConfig;
@@ -31,7 +33,10 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.sink.utils.TestWriteBase;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
 
@@ -40,14 +45,18 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Test cases for stream write.
@@ -673,6 +682,27 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
         .end();
   }
 
+  @ParameterizedTest
+  @EnumSource(MarkerType.class)
+  public void testMarkType(MarkerType markerType) throws Exception {
+    conf.setString(HoodieWriteConfig.MARKERS_TYPE.key(), 
markerType.toString());
+    TestHarness testHarness =
+        preparePipeline(conf)
+            .consume(TestData.DATA_SET_INSERT)
+            // no checkpoint, so the coordinator does not accept any events
+            .emptyEventBuffer()
+            .checkpoint(1)
+            .assertNextEvent(4, "par1,par2,par3,par4");
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+    List<StoragePathInfo> files =  metaClient.getStorage().listFiles(new 
StoragePath(metaClient.getTempFolderPath()));
+    if (markerType == MarkerType.DIRECT) {
+      assertTrue(files.stream().allMatch(f -> 
f.getPath().getName().endsWith("marker.CREATE")));
+    } else {
+      assertTrue(files.stream().noneMatch(f -> 
f.getPath().getName().endsWith("marker.CREATE")));
+    }
+    testHarness.checkpointComplete(1).checkWrittenData(EXPECTED1).end();
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index 94214661ea87..ab2f71e291a6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -20,19 +20,29 @@ package org.apache.hudi.sink;
 
 import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestData;
 
 import org.apache.flink.configuration.Configuration;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 /**
  * Test cases for delta stream write.
  */
@@ -201,6 +211,27 @@ public class TestWriteMergeOnRead extends 
TestWriteCopyOnWrite {
         .end();
   }
 
+  @ParameterizedTest
+  @EnumSource(MarkerType.class)
+  public void testMarkType(MarkerType markerType) throws Exception {
+    conf.setString(HoodieWriteConfig.MARKERS_TYPE.key(), 
markerType.toString());
+    TestHarness testHarness =
+        preparePipeline(conf)
+            .consume(TestData.DATA_SET_INSERT)
+            // no checkpoint, so the coordinator does not accept any events
+            .emptyEventBuffer()
+            .checkpoint(1)
+            .assertNextEvent(4, "par1,par2,par3,par4");
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+    List<StoragePathInfo> files =  metaClient.getStorage().listFiles(new 
StoragePath(metaClient.getTempFolderPath()));
+    if (markerType == MarkerType.DIRECT) {
+      assertTrue(files.stream().allMatch(f -> 
f.getPath().getName().endsWith("marker.APPEND")));
+    } else {
+      assertTrue(files.stream().noneMatch(f -> 
f.getPath().getName().endsWith("marker.APPEND")));
+    }
+    testHarness.checkpointComplete(1).checkWrittenData(EXPECTED1).end();
+  }
+
   @Override
   protected Map<String, String> getExpectedBeforeCheckpointComplete() {
     return EXPECTED1;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index abc5679367a1..98563265b2db 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.sink;
 
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.configuration.FlinkOptions;
 
 import org.apache.flink.configuration.Configuration;
@@ -74,4 +75,9 @@ public class TestWriteMergeOnReadWithCompact extends 
TestWriteCopyOnWrite {
   protected HoodieTableType getTableType() {
     return HoodieTableType.MERGE_ON_READ;
   }
+
+  @Override
+  public void testMarkType(MarkerType markerType) throws Exception {
+    // do nothing.
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
index ec2211f02cf3..adeffbdc6f5d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -74,6 +74,8 @@ import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
 import java.util.HashMap;
@@ -535,6 +537,101 @@ public class ITTestHoodieFlinkClustering {
     // wait for the asynchronous commit to finish
     TimeUnit.SECONDS.sleep(3);
 
+    runCluster(rowType);
+
+    // test output
+    final Map<String, String> expected = new HashMap<>();
+    expected.put("par1", "[id1,par1,id1,Danny,23,1100001,par1, 
id2,par1,id2,Stephen,33,2100001,par1]");
+    expected.put("par2", "[id3,par2,id3,Julian,53,3100001,par2, 
id4,par2,id4,Fabian,31,4100001,par2]");
+    expected.put("par3", "[id5,par3,id5,Sophia,18,5100001,par3, 
id6,par3,id6,Emma,20,6100001,par3]");
+    expected.put("par4", "[id7,par4,id7,Bob,44,7100001,par4, 
id8,par4,id8,Han,56,8100001,par4]");
+    TestData.checkWrittenData(tempFile, expected, 4);
+  }
+
+  @Test
+  public void testOfflineClusterFailoverAfterCommit() throws Exception {
+    StreamTableEnvironment tableEnv = prepareEnvAndTable();
+
+    FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+    cfg.path = tempFile.getAbsolutePath();
+    cfg.targetPartitions = 4;
+    Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+    assertDoesNotThrow(() -> runOfflineCluster(tableEnv, conf));
+
+    Table result = tableEnv.sqlQuery("select count(*) from t1");
+    assertEquals(16L, tableEnv.toDataStream(result, 
Row.class).executeAndCollect(1).get(0).getField(0));
+  }
+
+  private StreamTableEnvironment prepareEnvAndTable() {
+    // Create hoodie table and insert into data.
+    Configuration conf = new org.apache.flink.configuration.Configuration();
+    conf.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
+    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+    
tEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 4);
+    tEnv.getConfig().getConfiguration().set(TableConfigOptions.TABLE_DML_SYNC, 
true);
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+
+    // use append mode
+    options.put(FlinkOptions.OPERATION.key(), 
WriteOperationType.INSERT.value());
+    options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
+    options.put(FlinkOptions.TABLE_TYPE.key(), 
HoodieTableType.COPY_ON_WRITE.name());
+
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
+    tEnv.executeSql(hoodieTableDDL);
+    tEnv.executeSql(TestSQL.INSERT_T1);
+    return tEnv;
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testInsertWithDifferentRecordKeyNullabilityAndClustering(boolean 
withPk) throws Exception {
+    EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
+    TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+    tableEnv.getConfig().getConfiguration()
+        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
+
+    // if create a table without primary key, the nullability of the record 
key field is nullable
+    // otherwise, the nullability is not nullable.
+    String pkConstraint = withPk ? ",  primary key (uuid) not enforced\n" : "";
+    String tblWithoutPkDDL = "create table t1(\n"
+        + "  `uuid` VARCHAR(20)\n"
+        + ",  `name` VARCHAR(10)\n"
+        + ",  `age` INT\n"
+        + ",  `ts` TIMESTAMP(3)\n"
+        + ",  `partition` VARCHAR(10)\n"
+        + pkConstraint
+        + ")\n"
+        + "PARTITIONED BY (`partition`)\n"
+        + "with (\n"
+        + "  'connector' = 'hudi',\n"
+        + "  'hoodie.datasource.write.recordkey.field' = 'uuid',\n"
+        + "  'path' = '" + tempFile.getAbsolutePath() + "'\n"
+        + ")";
+    tableEnv.executeSql(tblWithoutPkDDL);
+    tableEnv.executeSql(TestSQL.INSERT_T1).await();
+
+    final RowType rowType = (RowType) DataTypes.ROW(
+            DataTypes.FIELD("uuid", DataTypes.VARCHAR(20).notNull()), // 
primary key set as not null
+            DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
+            DataTypes.FIELD("age", DataTypes.INT()),
+            DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)),
+            DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
+        .notNull().getLogicalType();
+
+    // run cluster with row type
+    runCluster(rowType);
+
+    final Map<String, String> expected = new HashMap<>();
+    expected.put("par1", "[id1,par1,id1,Danny,23,1000,par1, 
id2,par1,id2,Stephen,33,2000,par1]");
+    expected.put("par2", "[id3,par2,id3,Julian,53,3000,par2, 
id4,par2,id4,Fabian,31,4000,par2]");
+    expected.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, 
id6,par3,id6,Emma,20,6000,par3]");
+    expected.put("par4", "[id7,par4,id7,Bob,44,7000,par4, 
id8,par4,id8,Han,56,8000,par4]");
+    TestData.checkWrittenData(tempFile, expected, 4);
+  }
+
+  private void runCluster(RowType rowType) throws Exception {
     // make configuration and setAvroSchema.
     StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
     FlinkClusteringConfig cfg = new FlinkClusteringConfig();
@@ -606,53 +703,9 @@ public class ITTestHoodieFlinkClustering {
           .setParallelism(1);
 
       env.execute("flink_hudi_clustering");
-
-      // test output
-      final Map<String, String> expected = new HashMap<>();
-      expected.put("par1", "[id1,par1,id1,Danny,23,1100001,par1, 
id2,par1,id2,Stephen,33,2100001,par1]");
-      expected.put("par2", "[id3,par2,id3,Julian,53,3100001,par2, 
id4,par2,id4,Fabian,31,4100001,par2]");
-      expected.put("par3", "[id5,par3,id5,Sophia,18,5100001,par3, 
id6,par3,id6,Emma,20,6100001,par3]");
-      expected.put("par4", "[id7,par4,id7,Bob,44,7100001,par4, 
id8,par4,id8,Han,56,8100001,par4]");
-      TestData.checkWrittenData(tempFile, expected, 4);
     }
   }
 
-  @Test
-  public void testOfflineClusterFailoverAfterCommit() throws Exception {
-    StreamTableEnvironment tableEnv = prepareEnvAndTable();
-
-    FlinkClusteringConfig cfg = new FlinkClusteringConfig();
-    cfg.path = tempFile.getAbsolutePath();
-    cfg.targetPartitions = 4;
-    Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
-    assertDoesNotThrow(() -> runOfflineCluster(tableEnv, conf));
-
-    Table result = tableEnv.sqlQuery("select count(*) from t1");
-    assertEquals(16L, tableEnv.toDataStream(result, 
Row.class).executeAndCollect(1).get(0).getField(0));
-  }
-
-  private StreamTableEnvironment prepareEnvAndTable() {
-    // Create hoodie table and insert into data.
-    Configuration conf = new org.apache.flink.configuration.Configuration();
-    conf.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
-    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf);
-    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-    
tEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 4);
-    tEnv.getConfig().getConfiguration().set(TableConfigOptions.TABLE_DML_SYNC, 
true);
-    Map<String, String> options = new HashMap<>();
-    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
-
-    // use append mode
-    options.put(FlinkOptions.OPERATION.key(), 
WriteOperationType.INSERT.value());
-    options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
-    options.put(FlinkOptions.TABLE_TYPE.key(), 
HoodieTableType.COPY_ON_WRITE.name());
-
-    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
-    tEnv.executeSql(hoodieTableDDL);
-    tEnv.executeSql(TestSQL.INSERT_T1);
-    return tEnv;
-  }
-
   /**
    * schedule clustering, insert another batch, run clustering.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 99b0136b52ee..ebd1222cc1f2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -21,8 +21,10 @@ package org.apache.hudi.table;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
+import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.storage.StoragePath;
@@ -2142,6 +2144,21 @@ public class ITTestHoodieDataSource {
         + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
   }
 
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  void testWriteWithTimelineServerBasedMarker(HoodieTableType tableType) {
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .option(HoodieWriteConfig.MARKERS_TYPE.key(), 
MarkerType.TIMELINE_SERVER_BASED.name())
+        .end();
+    batchTableEnv.executeSql(hoodieTableDDL);
+
+    execInsertSql(batchTableEnv, TestSQL.INSERT_T1);
+    List<Row> rows = 
CollectionUtil.iteratorToList(batchTableEnv.executeSql("select * from 
t1").collect());
+    assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
index 262eea1f329a..01985c1f7153 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
@@ -19,10 +19,17 @@
 
 package org.apache.hudi.utils;
 
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.marker.DirectWriteMarkers;
+import org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.util.StreamerUtil;
 
@@ -30,11 +37,14 @@ import org.apache.flink.configuration.Configuration;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
  * Test cases for {@link org.apache.hudi.util.FlinkWriteClients}.
@@ -58,4 +68,29 @@ public class TestFlinkWriteClients {
     assertThat(writeConfig.getLockProviderClass(), 
is(FileSystemBasedLockProvider.class.getName()));
     assertThat(writeConfig.getWriteConcurrencyMode(), 
is(WriteConcurrencyMode.SINGLE_WRITER));
   }
+
+  @ParameterizedTest
+  @ValueSource(strings = {"", "DIRECT", "TIMELINE_SERVER_BASED"})
+  void testMarkerType(String markerType) throws Exception {
+    // create table
+    StreamerUtil.initTableIfNotExists(conf);
+    // This is expected to be used by the driver, the client can then send 
requests for files view.
+    FlinkWriteClients.createWriteClient(conf);
+
+    // do not set mark type to test the default behavior
+    if (!markerType.isEmpty()) {
+      conf.setString(HoodieWriteConfig.MARKERS_TYPE.key(), 
MarkerType.valueOf(markerType).name());
+    }
+    // This is expected to be used by writer client
+    HoodieWriteConfig writeConfig = 
FlinkWriteClients.getHoodieClientConfig(conf, false, true);
+    try (HoodieFlinkWriteClient writeClient = new 
HoodieFlinkWriteClient(HoodieFlinkEngineContext.DEFAULT, writeConfig)) {
+      HoodieTable table = writeClient.getHoodieTable();
+      String markerClass = 
WriteMarkersFactory.get(writeConfig.getMarkersType(), table, 
"001").getClass().getSimpleName();
+      if (markerType.isEmpty() || markerType.equals("DIRECT")) {
+        assertEquals(DirectWriteMarkers.class.getSimpleName(), markerClass);
+      } else {
+        assertEquals(TimelineServerBasedWriteMarkers.class.getSimpleName(), 
markerClass);
+      }
+    }
+  }
 }

Reply via email to