This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new b87959cec529 fix: Cherry pick flink bug fixes for release 0.14.2 and
0.15.1 (#17782)
b87959cec529 is described below
commit b87959cec52911512932398ac02517055db47001
Author: Shuo Cheng <[email protected]>
AuthorDate: Tue Jan 6 11:24:12 2026 +0800
fix: Cherry pick flink bug fixes for release 0.14.2 and 0.15.1 (#17782)
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 7 +
.../hudi/table/marker/WriteMarkersFactory.java | 2 +-
.../hudi/table/marker/TestWriteMarkersFactory.java | 17 ++-
.../java/org/apache/hudi/avro/AvroSchemaUtils.java | 24 ++++
.../hudi/sink/clustering/ClusteringOperator.java | 6 +-
.../org/apache/hudi/table/HoodieTableSource.java | 8 +-
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 30 +++++
.../org/apache/hudi/sink/TestWriteMergeOnRead.java | 31 +++++
.../hudi/sink/TestWriteMergeOnReadWithCompact.java | 6 +
.../sink/cluster/ITTestHoodieFlinkClustering.java | 141 ++++++++++++++-------
.../apache/hudi/table/ITTestHoodieDataSource.java | 17 +++
.../apache/hudi/utils/TestFlinkWriteClients.java | 35 +++++
12 files changed, 276 insertions(+), 48 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index a369b748ee26..2419071d41de 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -48,6 +48,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.HoodieRecordUtils;
@@ -1415,6 +1416,12 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBoolean(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED);
}
+ public boolean isRemoteViewStorageType() {
+ FileSystemViewStorageType storageType =
getViewStorageConfig().getStorageType();
+ return storageType == FileSystemViewStorageType.REMOTE_ONLY
+ || storageType == FileSystemViewStorageType.REMOTE_FIRST;
+ }
+
public int getEmbeddedTimelineServerPort() {
return
Integer.parseInt(getStringOrDefault(EMBEDDED_TIMELINE_SERVER_PORT_NUM));
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java
index c2317bf9bf89..14f0046fa027 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java
@@ -46,7 +46,7 @@ public class WriteMarkersFactory {
case DIRECT:
return new DirectWriteMarkers(table, instantTime);
case TIMELINE_SERVER_BASED:
- if (!table.getConfig().isEmbeddedTimelineServerEnabled()) {
+ if (!table.getConfig().isEmbeddedTimelineServerEnabled() &&
!table.getConfig().isRemoteViewStorageType()) {
LOG.warn("Timeline-server-based markers are configured as the marker
type "
+ "but embedded timeline server is not enabled. Falling back to
direct markers.");
return new DirectWriteMarkers(table, instantTime);
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersFactory.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersFactory.java
index 3bddf709b881..4a974783ea89 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersFactory.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersFactory.java
@@ -90,6 +90,13 @@ public class TestWriteMarkersFactory extends
HoodieCommonTestHarness {
DirectWriteMarkers.class);
}
+ @Test
+ public void testTimelineServerBasedMarkersWithRemoteViewStorageType() {
+ // Fallback to direct markers should happen
+ testWriteMarkersFactory(
+ MarkerType.TIMELINE_SERVER_BASED, NON_HDFS_BASE_PATH, false, true,
TimelineServerBasedWriteMarkers.class);
+ }
+
@Test
public void testTimelineServerBasedMarkersWithHDFS() {
// Fallback to direct markers should happen
@@ -98,13 +105,21 @@ public class TestWriteMarkersFactory extends
HoodieCommonTestHarness {
DirectWriteMarkers.class);
}
+ private void testWriteMarkersFactory(
+ MarkerType markerTypeConfig, String basePath, boolean
isTimelineServerEnabled, Class<?> expectedWriteMarkersClass) {
+ testWriteMarkersFactory(
+ markerTypeConfig, basePath, isTimelineServerEnabled, false,
expectedWriteMarkersClass);
+ }
+
private void testWriteMarkersFactory(
MarkerType markerTypeConfig, String basePath, boolean
isTimelineServerEnabled,
- Class<?> expectedWriteMarkersClass) {
+ boolean isRemoteViewStorageType, Class<?> expectedWriteMarkersClass) {
String instantTime = "001";
Mockito.when(table.getConfig()).thenReturn(writeConfig);
Mockito.when(writeConfig.isEmbeddedTimelineServerEnabled())
.thenReturn(isTimelineServerEnabled);
+ Mockito.when(writeConfig.isRemoteViewStorageType())
+ .thenReturn(isRemoteViewStorageType);
Mockito.when(table.getMetaClient()).thenReturn(metaClient);
Mockito.when(metaClient.getStorage()).thenReturn(storage);
Mockito.when(storage.getFileSystem()).thenReturn(fileSystem);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index 2e5093390e4b..f3d9897c64a1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -23,6 +23,9 @@ import org.apache.hudi.exception.InvalidUnionTypeException;
import org.apache.hudi.exception.MissingSchemaFieldException;
import org.apache.hudi.exception.SchemaBackwardsCompatibilityException;
import org.apache.hudi.exception.SchemaCompatibilityException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.action.TableChanges;
+import org.apache.hudi.internal.schema.utils.SchemaChangeUtils;
import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;
@@ -37,7 +40,9 @@ import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
+import static org.apache.hudi.common.util.CollectionUtils.reduce;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter.convert;
/**
* Utils for Avro Schema.
@@ -447,4 +452,23 @@ public class AvroSchemaUtils {
public static String createSchemaErrorString(String errorMessage, Schema
writerSchema, Schema tableSchema) {
return String.format("%s\nwriterSchema: %s\ntableSchema: %s",
errorMessage, writerSchema, tableSchema);
}
+
+ /**
+ * Create a new schema by force changing all the fields as nullable.
+ *
+ * @param schema original schema
+ * @return a new schema with all the fields updated as nullable.
+ */
+ public static Schema asNullable(Schema schema) {
+ List<String> filterCols = schema.getFields().stream()
+ .filter(f ->
!isNullable(f.schema())).map(Schema.Field::name).collect(Collectors.toList());
+ if (filterCols.isEmpty()) {
+ return schema;
+ }
+ InternalSchema internalSchema = convert(schema);
+ TableChanges.ColumnUpdateChange schemaChange =
TableChanges.ColumnUpdateChange.get(internalSchema);
+ schemaChange = reduce(filterCols, schemaChange,
+ (change, field) -> change.updateColumnNullability(field, true));
+ return convert(SchemaChangeUtils.applyTableChanges2Schema(internalSchema,
schemaChange), schema.getFullName());
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index 2e1bfd604d70..ca1a23f4e116 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -20,6 +20,7 @@ package org.apache.hudi.sink.clustering;
import org.apache.hudi.adapter.MaskingOutputAdapter;
import org.apache.hudi.adapter.Utils;
+import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -164,7 +165,10 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
this.table = writeClient.getHoodieTable();
this.schema = AvroSchemaConverter.convertToSchema(rowType);
- this.readerSchema = this.schema;
+ // Since there exists discrepancies between flink and spark dealing with
nullability of primary key field,
+ // and there may be some files written by spark, force update schema as
nullable to make sure clustering
+ // scan successfully without schema validating exception.
+ this.readerSchema = AvroSchemaUtils.asNullable(schema);
this.requiredPos = getRequiredPositions();
this.avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(rowType);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index ab39bce3d7f0..7a397bf45c64 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -71,6 +71,7 @@ import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -242,7 +243,12 @@ public class HoodieTableSource implements
} else if (OptionsResolver.isAppendMode(conf)) {
return source.partitionCustom(new
StreamReadAppendPartitioner(conf.getInteger(FlinkOptions.READ_TASKS)), new
StreamReadAppendKeySelector());
} else {
- return source.keyBy(MergeOnReadInputSplit::getFileId);
+ return source.keyBy(new KeySelector<MergeOnReadInputSplit, String>() {
+ @Override
+ public String getKey(MergeOnReadInputSplit split) throws Exception {
+ return split.getFileId();
+ }
+ });
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index bd6da22965d0..040297e4e1f4 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -23,6 +23,8 @@ import
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.config.HoodieCleanConfig;
@@ -31,7 +33,10 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.utils.TestWriteBase;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
@@ -40,14 +45,18 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test cases for stream write.
@@ -673,6 +682,27 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
.end();
}
+ @ParameterizedTest
+ @EnumSource(MarkerType.class)
+ public void testMarkType(MarkerType markerType) throws Exception {
+ conf.setString(HoodieWriteConfig.MARKERS_TYPE.key(),
markerType.toString());
+ TestHarness testHarness =
+ preparePipeline(conf)
+ .consume(TestData.DATA_SET_INSERT)
+ // no checkpoint, so the coordinator does not accept any events
+ .emptyEventBuffer()
+ .checkpoint(1)
+ .assertNextEvent(4, "par1,par2,par3,par4");
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+ List<StoragePathInfo> files = metaClient.getStorage().listFiles(new
StoragePath(metaClient.getTempFolderPath()));
+ if (markerType == MarkerType.DIRECT) {
+ assertTrue(files.stream().allMatch(f ->
f.getPath().getName().endsWith("marker.CREATE")));
+ } else {
+ assertTrue(files.stream().noneMatch(f ->
f.getPath().getName().endsWith("marker.CREATE")));
+ }
+ testHarness.checkpointComplete(1).checkWrittenData(EXPECTED1).end();
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index 94214661ea87..ab2f71e291a6 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -20,19 +20,29 @@ package org.apache.hudi.sink;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestData;
import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
/**
* Test cases for delta stream write.
*/
@@ -201,6 +211,27 @@ public class TestWriteMergeOnRead extends
TestWriteCopyOnWrite {
.end();
}
+ @ParameterizedTest
+ @EnumSource(MarkerType.class)
+ public void testMarkType(MarkerType markerType) throws Exception {
+ conf.setString(HoodieWriteConfig.MARKERS_TYPE.key(),
markerType.toString());
+ TestHarness testHarness =
+ preparePipeline(conf)
+ .consume(TestData.DATA_SET_INSERT)
+ // no checkpoint, so the coordinator does not accept any events
+ .emptyEventBuffer()
+ .checkpoint(1)
+ .assertNextEvent(4, "par1,par2,par3,par4");
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+ List<StoragePathInfo> files = metaClient.getStorage().listFiles(new
StoragePath(metaClient.getTempFolderPath()));
+ if (markerType == MarkerType.DIRECT) {
+ assertTrue(files.stream().allMatch(f ->
f.getPath().getName().endsWith("marker.APPEND")));
+ } else {
+ assertTrue(files.stream().noneMatch(f ->
f.getPath().getName().endsWith("marker.APPEND")));
+ }
+ testHarness.checkpointComplete(1).checkWrittenData(EXPECTED1).end();
+ }
+
@Override
protected Map<String, String> getExpectedBeforeCheckpointComplete() {
return EXPECTED1;
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index abc5679367a1..98563265b2db 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -19,6 +19,7 @@
package org.apache.hudi.sink;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.flink.configuration.Configuration;
@@ -74,4 +75,9 @@ public class TestWriteMergeOnReadWithCompact extends
TestWriteCopyOnWrite {
protected HoodieTableType getTableType() {
return HoodieTableType.MERGE_ON_READ;
}
+
+ @Override
+ public void testMarkType(MarkerType markerType) throws Exception {
+ // do nothing.
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
index ec2211f02cf3..adeffbdc6f5d 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -74,6 +74,8 @@ import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.util.HashMap;
@@ -535,6 +537,101 @@ public class ITTestHoodieFlinkClustering {
// wait for the asynchronous commit to finish
TimeUnit.SECONDS.sleep(3);
+ runCluster(rowType);
+
+ // test output
+ final Map<String, String> expected = new HashMap<>();
+ expected.put("par1", "[id1,par1,id1,Danny,23,1100001,par1,
id2,par1,id2,Stephen,33,2100001,par1]");
+ expected.put("par2", "[id3,par2,id3,Julian,53,3100001,par2,
id4,par2,id4,Fabian,31,4100001,par2]");
+ expected.put("par3", "[id5,par3,id5,Sophia,18,5100001,par3,
id6,par3,id6,Emma,20,6100001,par3]");
+ expected.put("par4", "[id7,par4,id7,Bob,44,7100001,par4,
id8,par4,id8,Han,56,8100001,par4]");
+ TestData.checkWrittenData(tempFile, expected, 4);
+ }
+
+ @Test
+ public void testOfflineClusterFailoverAfterCommit() throws Exception {
+ StreamTableEnvironment tableEnv = prepareEnvAndTable();
+
+ FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+ cfg.path = tempFile.getAbsolutePath();
+ cfg.targetPartitions = 4;
+ Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+ assertDoesNotThrow(() -> runOfflineCluster(tableEnv, conf));
+
+ Table result = tableEnv.sqlQuery("select count(*) from t1");
+ assertEquals(16L, tableEnv.toDataStream(result,
Row.class).executeAndCollect(1).get(0).getField(0));
+ }
+
+ private StreamTableEnvironment prepareEnvAndTable() {
+ // Create hoodie table and insert into data.
+ Configuration conf = new org.apache.flink.configuration.Configuration();
+ conf.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(conf);
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
tEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
4);
+ tEnv.getConfig().getConfiguration().set(TableConfigOptions.TABLE_DML_SYNC,
true);
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+
+ // use append mode
+ options.put(FlinkOptions.OPERATION.key(),
WriteOperationType.INSERT.value());
+ options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
+ options.put(FlinkOptions.TABLE_TYPE.key(),
HoodieTableType.COPY_ON_WRITE.name());
+
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1",
options);
+ tEnv.executeSql(hoodieTableDDL);
+ tEnv.executeSql(TestSQL.INSERT_T1);
+ return tEnv;
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testInsertWithDifferentRecordKeyNullabilityAndClustering(boolean
withPk) throws Exception {
+ EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
+ TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+ tableEnv.getConfig().getConfiguration()
+
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
+
+ // if create a table without primary key, the nullability of the record
key field is nullable
+ // otherwise, the nullability is not nullable.
+ String pkConstraint = withPk ? ", primary key (uuid) not enforced\n" : "";
+ String tblWithoutPkDDL = "create table t1(\n"
+ + " `uuid` VARCHAR(20)\n"
+ + ", `name` VARCHAR(10)\n"
+ + ", `age` INT\n"
+ + ", `ts` TIMESTAMP(3)\n"
+ + ", `partition` VARCHAR(10)\n"
+ + pkConstraint
+ + ")\n"
+ + "PARTITIONED BY (`partition`)\n"
+ + "with (\n"
+ + " 'connector' = 'hudi',\n"
+ + " 'hoodie.datasource.write.recordkey.field' = 'uuid',\n"
+ + " 'path' = '" + tempFile.getAbsolutePath() + "'\n"
+ + ")";
+ tableEnv.executeSql(tblWithoutPkDDL);
+ tableEnv.executeSql(TestSQL.INSERT_T1).await();
+
+ final RowType rowType = (RowType) DataTypes.ROW(
+ DataTypes.FIELD("uuid", DataTypes.VARCHAR(20).notNull()), //
primary key set as not null
+ DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
+ DataTypes.FIELD("age", DataTypes.INT()),
+ DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)),
+ DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
+ .notNull().getLogicalType();
+
+ // run cluster with row type
+ runCluster(rowType);
+
+ final Map<String, String> expected = new HashMap<>();
+ expected.put("par1", "[id1,par1,id1,Danny,23,1000,par1,
id2,par1,id2,Stephen,33,2000,par1]");
+ expected.put("par2", "[id3,par2,id3,Julian,53,3000,par2,
id4,par2,id4,Fabian,31,4000,par2]");
+ expected.put("par3", "[id5,par3,id5,Sophia,18,5000,par3,
id6,par3,id6,Emma,20,6000,par3]");
+ expected.put("par4", "[id7,par4,id7,Bob,44,7000,par4,
id8,par4,id8,Han,56,8000,par4]");
+ TestData.checkWrittenData(tempFile, expected, 4);
+ }
+
+ private void runCluster(RowType rowType) throws Exception {
// make configuration and setAvroSchema.
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
FlinkClusteringConfig cfg = new FlinkClusteringConfig();
@@ -606,53 +703,9 @@ public class ITTestHoodieFlinkClustering {
.setParallelism(1);
env.execute("flink_hudi_clustering");
-
- // test output
- final Map<String, String> expected = new HashMap<>();
- expected.put("par1", "[id1,par1,id1,Danny,23,1100001,par1,
id2,par1,id2,Stephen,33,2100001,par1]");
- expected.put("par2", "[id3,par2,id3,Julian,53,3100001,par2,
id4,par2,id4,Fabian,31,4100001,par2]");
- expected.put("par3", "[id5,par3,id5,Sophia,18,5100001,par3,
id6,par3,id6,Emma,20,6100001,par3]");
- expected.put("par4", "[id7,par4,id7,Bob,44,7100001,par4,
id8,par4,id8,Han,56,8100001,par4]");
- TestData.checkWrittenData(tempFile, expected, 4);
}
}
- @Test
- public void testOfflineClusterFailoverAfterCommit() throws Exception {
- StreamTableEnvironment tableEnv = prepareEnvAndTable();
-
- FlinkClusteringConfig cfg = new FlinkClusteringConfig();
- cfg.path = tempFile.getAbsolutePath();
- cfg.targetPartitions = 4;
- Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
- assertDoesNotThrow(() -> runOfflineCluster(tableEnv, conf));
-
- Table result = tableEnv.sqlQuery("select count(*) from t1");
- assertEquals(16L, tableEnv.toDataStream(result,
Row.class).executeAndCollect(1).get(0).getField(0));
- }
-
- private StreamTableEnvironment prepareEnvAndTable() {
- // Create hoodie table and insert into data.
- Configuration conf = new org.apache.flink.configuration.Configuration();
- conf.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(conf);
- StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-
tEnv.getConfig().getConfiguration().setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
4);
- tEnv.getConfig().getConfiguration().set(TableConfigOptions.TABLE_DML_SYNC,
true);
- Map<String, String> options = new HashMap<>();
- options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
-
- // use append mode
- options.put(FlinkOptions.OPERATION.key(),
WriteOperationType.INSERT.value());
- options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
- options.put(FlinkOptions.TABLE_TYPE.key(),
HoodieTableType.COPY_ON_WRITE.name());
-
- String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1",
options);
- tEnv.executeSql(hoodieTableDDL);
- tEnv.executeSql(TestSQL.INSERT_T1);
- return tEnv;
- }
-
/**
* schedule clustering, insert another batch, run clustering.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 99b0136b52ee..ebd1222cc1f2 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -21,8 +21,10 @@ package org.apache.hudi.table;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
+import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.StoragePath;
@@ -2142,6 +2144,21 @@ public class ITTestHoodieDataSource {
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
}
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ void testWriteWithTimelineServerBasedMarker(HoodieTableType tableType) {
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .option(HoodieWriteConfig.MARKERS_TYPE.key(),
MarkerType.TIMELINE_SERVER_BASED.name())
+ .end();
+ batchTableEnv.executeSql(hoodieTableDDL);
+
+ execInsertSql(batchTableEnv, TestSQL.INSERT_T1);
+ List<Row> rows =
CollectionUtil.iteratorToList(batchTableEnv.executeSql("select * from
t1").collect());
+ assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
index 262eea1f329a..01985c1f7153 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
@@ -19,10 +19,17 @@
package org.apache.hudi.utils;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.marker.DirectWriteMarkers;
+import org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
@@ -30,11 +37,14 @@ import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Test cases for {@link org.apache.hudi.util.FlinkWriteClients}.
@@ -58,4 +68,29 @@ public class TestFlinkWriteClients {
assertThat(writeConfig.getLockProviderClass(),
is(FileSystemBasedLockProvider.class.getName()));
assertThat(writeConfig.getWriteConcurrencyMode(),
is(WriteConcurrencyMode.SINGLE_WRITER));
}
+
+ @ParameterizedTest
+ @ValueSource(strings = {"", "DIRECT", "TIMELINE_SERVER_BASED"})
+ void testMarkerType(String markerType) throws Exception {
+ // create table
+ StreamerUtil.initTableIfNotExists(conf);
+ // This is expected to be used by the driver, the client can then send
requests for files view.
+ FlinkWriteClients.createWriteClient(conf);
+
+ // do not set mark type to test the default behavior
+ if (!markerType.isEmpty()) {
+ conf.setString(HoodieWriteConfig.MARKERS_TYPE.key(),
MarkerType.valueOf(markerType).name());
+ }
+ // This is expected to be used by writer client
+ HoodieWriteConfig writeConfig =
FlinkWriteClients.getHoodieClientConfig(conf, false, true);
+ try (HoodieFlinkWriteClient writeClient = new
HoodieFlinkWriteClient(HoodieFlinkEngineContext.DEFAULT, writeConfig)) {
+ HoodieTable table = writeClient.getHoodieTable();
+ String markerClass =
WriteMarkersFactory.get(writeConfig.getMarkersType(), table,
"001").getClass().getSimpleName();
+ if (markerType.isEmpty() || markerType.equals("DIRECT")) {
+ assertEquals(DirectWriteMarkers.class.getSimpleName(), markerClass);
+ } else {
+ assertEquals(TimelineServerBasedWriteMarkers.class.getSimpleName(),
markerClass);
+ }
+ }
+ }
}