This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 46a2399 [HUDI-1902] Global index for flink writer (#2958)
46a2399 is described below
commit 46a2399a45803e1b0d863eec47168a09c37ab920
Author: Danny Chan <[email protected]>
AuthorDate: Tue May 18 13:55:38 2021 +0800
[HUDI-1902] Global index for flink writer (#2958)
Supports deduplication for record keys with different partition path.
---
.../apache/hudi/common/model/BaseAvroPayload.java | 2 +-
.../common/model/HoodieRecordGlobalLocation.java | 97 ++++++++++++++++++++
.../hudi/common/model/HoodieRecordLocation.java | 4 +-
.../apache/hudi/configuration/FlinkOptions.java | 7 ++
.../sink/partitioner/BucketAssignFunction.java | 100 +++++++++++++++------
.../sink/transform/RowDataToHoodieFunction.java | 59 +-----------
.../apache/hudi/sink/utils/PayloadCreation.java | 93 +++++++++++++++++++
.../apache/hudi/table/HoodieDataSourceITCase.java | 50 +++++++++++
.../test/java/org/apache/hudi/utils/TestData.java | 14 ++-
hudi-flink/src/test/resources/test_source_4.data | 8 ++
10 files changed, 345 insertions(+), 89 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
index 3b35b0d..cd3a95e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
@@ -37,7 +37,7 @@ public abstract class BaseAvroPayload implements Serializable
{
/**
* For purposes of preCombining.
*/
- protected final Comparable orderingVal;
+ public final Comparable orderingVal;
/**
* Instantiate {@link BaseAvroPayload}.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java
new file mode 100644
index 0000000..f469a1a
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import java.util.Objects;
+
+/**
+ * Similar with {@link org.apache.hudi.common.model.HoodieRecordLocation} but
with partition path.
+ */
+public class HoodieRecordGlobalLocation extends HoodieRecordLocation {
+ private static final long serialVersionUID = 1L;
+
+ private String partitionPath;
+
+ public HoodieRecordGlobalLocation() {
+ }
+
+ public HoodieRecordGlobalLocation(String partitionPath, String instantTime,
String fileId) {
+ super(instantTime, fileId);
+ this.partitionPath = partitionPath;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("HoodieGlobalRecordLocation {");
+ sb.append("partitionPath=").append(partitionPath).append(", ");
+ sb.append("instantTime=").append(instantTime).append(", ");
+ sb.append("fileId=").append(fileId);
+ sb.append('}');
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HoodieRecordGlobalLocation otherLoc = (HoodieRecordGlobalLocation) o;
+ return Objects.equals(partitionPath, otherLoc.partitionPath)
+ && Objects.equals(instantTime, otherLoc.instantTime)
+ && Objects.equals(fileId, otherLoc.fileId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(partitionPath, instantTime, fileId);
+ }
+
+ public String getPartitionPath() {
+ return partitionPath;
+ }
+
+ public void setPartitionPath(String partitionPath) {
+ this.partitionPath = partitionPath;
+ }
+
+ /**
+ * Returns the global record location from local.
+ */
+ public static HoodieRecordGlobalLocation fromLocal(String partitionPath,
HoodieRecordLocation localLoc) {
+ return new HoodieRecordGlobalLocation(partitionPath,
localLoc.getInstantTime(), localLoc.getFileId());
+ }
+
+ /**
+ * Returns the record location as local.
+ */
+ public HoodieRecordLocation toLocal(String instantTime) {
+ return new HoodieRecordLocation(instantTime, fileId);
+ }
+
+ /**
+ * Copy the location with given partition path.
+ */
+ public HoodieRecordGlobalLocation copy(String partitionPath) {
+ return new HoodieRecordGlobalLocation(partitionPath, instantTime, fileId);
+ }
+}
+
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
index 1692cfb..2b1feab 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
@@ -26,8 +26,8 @@ import java.util.Objects;
*/
public class HoodieRecordLocation implements Serializable {
- private String instantTime;
- private String fileId;
+ protected String instantTime;
+ protected String fileId;
public HoodieRecordLocation() {
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 4bed143..33a16c0 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -86,6 +86,13 @@ public class FlinkOptions {
.defaultValue(1.5D)
.withDescription("Index state ttl in days, default 1.5 day");
+ public static final ConfigOption<Boolean> INDEX_GLOBAL_ENABLED =
ConfigOptions
+ .key("index.global.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether to update index for the old partition path\n"
+ + "if same key record with different partition path came in, default
true");
+
// ------------------------------------------------------------------------
// Read Options
// ------------------------------------------------------------------------
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 0f599d2..5ad20d5 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -21,9 +21,11 @@ package org.apache.hudi.sink.partitioner;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.model.BaseAvroPayload;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
@@ -32,6 +34,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.sink.utils.PayloadCreation;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.util.StreamerUtil;
@@ -90,7 +93,7 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
* <li>If it does not, use the {@link BucketAssigner} to generate a new
bucket ID</li>
* </ul>
*/
- private MapState<HoodieKey, HoodieRecordLocation> indexState;
+ private MapState<String, HoodieRecordGlobalLocation> indexState;
/**
* Bucket assigner to assign new bucket IDs or reuse existing ones.
@@ -110,11 +113,23 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
*/
private MapState<String, Integer> partitionLoadState;
+ /**
+ * Used to create DELETE payload.
+ */
+ private PayloadCreation payloadCreation;
+
+ /**
+ * If the index is global, update the index for the old partition path
+ * if same key record with different partition path came in.
+ */
+ private final boolean globalIndex;
+
public BucketAssignFunction(Configuration conf) {
this.conf = conf;
this.isChangingRecords = WriteOperationType.isChangingRecords(
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
this.bootstrapIndex =
conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED);
+ this.globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
}
@Override
@@ -132,6 +147,7 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
context,
writeConfig);
+ this.payloadCreation = PayloadCreation.instance(this.conf);
}
@Override
@@ -141,11 +157,11 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
@Override
public void initializeState(FunctionInitializationContext context) {
- MapStateDescriptor<HoodieKey, HoodieRecordLocation> indexStateDesc =
+ MapStateDescriptor<String, HoodieRecordGlobalLocation> indexStateDesc =
new MapStateDescriptor<>(
"indexState",
- TypeInformation.of(HoodieKey.class),
- TypeInformation.of(HoodieRecordLocation.class));
+ Types.STRING,
+ TypeInformation.of(HoodieRecordGlobalLocation.class));
double ttl = conf.getDouble(FlinkOptions.INDEX_STATE_TTL) * 24 * 60 * 60 *
1000;
if (ttl > 0) {
indexStateDesc.enableTimeToLive(StateTtlConfigUtil.createTtlConfig((long) ttl));
@@ -166,38 +182,41 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
// 3. if it is an INSERT, decide the location using the BucketAssigner
then send it out.
HoodieRecord<?> record = (HoodieRecord<?>) value;
final HoodieKey hoodieKey = record.getKey();
- final BucketInfo bucketInfo;
+ final String recordKey = hoodieKey.getRecordKey();
+ final String partitionPath = hoodieKey.getPartitionPath();
final HoodieRecordLocation location;
// The dataset may be huge, thus the processing would block for long,
// disabled by default.
- if (bootstrapIndex &&
!partitionLoadState.contains(hoodieKey.getPartitionPath())) {
+ if (bootstrapIndex && !partitionLoadState.contains(partitionPath)) {
// If the partition records are never loaded, load the records first.
- loadRecords(hoodieKey.getPartitionPath());
+ loadRecords(partitionPath);
}
// Only changing records need looking up the index for the location,
// append only records are always recognized as INSERT.
- if (isChangingRecords && this.indexState.contains(hoodieKey)) {
+ if (isChangingRecords && indexState.contains(recordKey)) {
// Set up the instant time as "U" to mark the bucket as an update bucket.
- location = new HoodieRecordLocation("U",
this.indexState.get(hoodieKey).getFileId());
- this.bucketAssigner.addUpdate(record.getPartitionPath(),
location.getFileId());
- } else {
- bucketInfo = this.bucketAssigner.addInsert(hoodieKey.getPartitionPath());
- switch (bucketInfo.getBucketType()) {
- case INSERT:
- // This is an insert bucket, use HoodieRecordLocation instant time
as "I".
- // Downstream operators can then check the instant time to know
whether
- // a record belongs to an insert bucket.
- location = new HoodieRecordLocation("I",
bucketInfo.getFileIdPrefix());
- break;
- case UPDATE:
- location = new HoodieRecordLocation("U",
bucketInfo.getFileIdPrefix());
- break;
- default:
- throw new AssertionError();
+ HoodieRecordGlobalLocation oldLoc = this.indexState.get(recordKey);
+ if (!StreamerUtil.equal(oldLoc.getPartitionPath(), partitionPath)) {
+ if (globalIndex) {
+ // if partition path changes, emit a delete record for old partition
path,
+ // then update the index state using location with new partition
path.
+ HoodieRecord<?> deleteRecord = new HoodieRecord<>(new
HoodieKey(recordKey, oldLoc.getPartitionPath()),
+ payloadCreation.createDeletePayload((BaseAvroPayload)
record.getData()));
+ deleteRecord.setCurrentLocation(oldLoc.toLocal("U"));
+ deleteRecord.seal();
+ out.collect((O) deleteRecord);
+ }
+ location = getNewRecordLocation(partitionPath);
+ updateIndexState(recordKey, partitionPath, location);
+ } else {
+ location = oldLoc.toLocal("U");
+ this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
}
+ } else {
+ location = getNewRecordLocation(partitionPath);
if (isChangingRecords) {
- this.indexState.put(hoodieKey, location);
+ updateIndexState(recordKey, partitionPath, location);
}
}
record.unseal();
@@ -206,6 +225,32 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
out.collect((O) record);
}
+ private HoodieRecordLocation getNewRecordLocation(String partitionPath) {
+ final BucketInfo bucketInfo = this.bucketAssigner.addInsert(partitionPath);
+ final HoodieRecordLocation location;
+ switch (bucketInfo.getBucketType()) {
+ case INSERT:
+ // This is an insert bucket, use HoodieRecordLocation instant time as
"I".
+ // Downstream operators can then check the instant time to know whether
+ // a record belongs to an insert bucket.
+ location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix());
+ break;
+ case UPDATE:
+ location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix());
+ break;
+ default:
+ throw new AssertionError();
+ }
+ return location;
+ }
+
+ private void updateIndexState(
+ String recordKey,
+ String partitionPath,
+ HoodieRecordLocation localLoc) throws Exception {
+ this.indexState.put(recordKey,
HoodieRecordGlobalLocation.fromLocal(partitionPath, localLoc));
+ }
+
@Override
public void notifyCheckpointComplete(long l) {
// Refresh the table state when there are new commits.
@@ -245,7 +290,8 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
boolean shouldLoad =
KeyGroupRangeAssignment.assignKeyToParallelOperator(
hoodieKey.getRecordKey(), maxParallelism, parallelism) == taskID;
if (shouldLoad) {
- this.indexState.put(hoodieKey, new
HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+ this.indexState.put(hoodieKey.getRecordKey(),
+ new HoodieRecordGlobalLocation(hoodieKey.getPartitionPath(),
baseFile.getCommitTime(), baseFile.getFileId()));
}
} catch (Exception e) {
LOG.error("Error when putting record keys into the state from file:
{}", baseFile);
@@ -265,7 +311,7 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
@VisibleForTesting
public boolean isKeyInState(HoodieKey hoodieKey) {
try {
- return this.indexState.contains(hoodieKey);
+ return this.indexState.contains(hoodieKey.getRecordKey());
} catch (Exception e) {
throw new HoodieException(e);
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
index 5bd3c68..fcf77db 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
@@ -18,16 +18,12 @@
package org.apache.hudi.sink.transform;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.sink.utils.PayloadCreation;
import org.apache.hudi.util.RowDataToAvroConverters;
import org.apache.hudi.util.StreamerUtil;
@@ -39,11 +35,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
-import javax.annotation.Nullable;
-
import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.Constructor;
/**
* Function that transforms RowData to HoodieRecord.
@@ -116,53 +108,4 @@ public class RowDataToHoodieFunction<I extends RowData, O
extends HoodieRecord<?
HoodieRecordPayload payload = payloadCreation.createPayload(gr, isDelete);
return new HoodieRecord<>(hoodieKey, payload);
}
-
- /**
- * Util to create hoodie pay load instance.
- */
- private static class PayloadCreation implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final boolean shouldCombine;
- private final Constructor<?> constructor;
- private final String preCombineField;
-
- private PayloadCreation(
- boolean shouldCombine,
- Constructor<?> constructor,
- @Nullable String preCombineField) {
- this.shouldCombine = shouldCombine;
- this.constructor = constructor;
- this.preCombineField = preCombineField;
- }
-
- public static PayloadCreation instance(Configuration conf) throws
Exception {
- boolean shouldCombine = conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS)
- ||
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)) ==
WriteOperationType.UPSERT;
- String preCombineField = null;
- final Class<?>[] argTypes;
- final Constructor<?> constructor;
- if (shouldCombine) {
- preCombineField = conf.getString(FlinkOptions.PRECOMBINE_FIELD);
- argTypes = new Class<?>[] {GenericRecord.class, Comparable.class};
- } else {
- argTypes = new Class<?>[] {Option.class};
- }
- final String clazz = conf.getString(FlinkOptions.PAYLOAD_CLASS);
- constructor = ReflectionUtils.getClass(clazz).getConstructor(argTypes);
- return new PayloadCreation(shouldCombine, constructor, preCombineField);
- }
-
- public HoodieRecordPayload<?> createPayload(GenericRecord record, boolean
isDelete) throws Exception {
- if (shouldCombine) {
- ValidationUtils.checkState(preCombineField != null);
- Comparable<?> orderingVal = (Comparable<?>)
HoodieAvroUtils.getNestedFieldVal(record,
- preCombineField, false);
- return (HoodieRecordPayload<?>) constructor.newInstance(
- isDelete ? null : record, orderingVal);
- } else {
- return (HoodieRecordPayload<?>)
this.constructor.newInstance(Option.of(record));
- }
- }
- }
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java
new file mode 100644
index 0000000..831da25
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.BaseAvroPayload;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.configuration.Configuration;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+
+/**
+ * Util to create hoodie pay load instance.
+ */
+public class PayloadCreation implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final boolean shouldCombine;
+ private final Constructor<?> constructor;
+ private final String preCombineField;
+
+ private PayloadCreation(
+ boolean shouldCombine,
+ Constructor<?> constructor,
+ @Nullable String preCombineField) {
+ this.shouldCombine = shouldCombine;
+ this.constructor = constructor;
+ this.preCombineField = preCombineField;
+ }
+
+ public static PayloadCreation instance(Configuration conf) throws Exception {
+ boolean shouldCombine = conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS)
+ ||
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)) ==
WriteOperationType.UPSERT;
+ String preCombineField = null;
+ final Class<?>[] argTypes;
+ final Constructor<?> constructor;
+ if (shouldCombine) {
+ preCombineField = conf.getString(FlinkOptions.PRECOMBINE_FIELD);
+ argTypes = new Class<?>[] {GenericRecord.class, Comparable.class};
+ } else {
+ argTypes = new Class<?>[] {Option.class};
+ }
+ final String clazz = conf.getString(FlinkOptions.PAYLOAD_CLASS);
+ constructor = ReflectionUtils.getClass(clazz).getConstructor(argTypes);
+ return new PayloadCreation(shouldCombine, constructor, preCombineField);
+ }
+
+ public HoodieRecordPayload<?> createPayload(GenericRecord record, boolean
isDelete) throws Exception {
+ if (shouldCombine) {
+ ValidationUtils.checkState(preCombineField != null);
+ Comparable<?> orderingVal = (Comparable<?>)
HoodieAvroUtils.getNestedFieldVal(record,
+ preCombineField, false);
+ return (HoodieRecordPayload<?>) constructor.newInstance(
+ isDelete ? null : record, orderingVal);
+ } else {
+ return (HoodieRecordPayload<?>)
this.constructor.newInstance(Option.of(record));
+ }
+ }
+
+ public HoodieRecordPayload<?> createDeletePayload(BaseAvroPayload payload)
throws Exception {
+ if (shouldCombine) {
+ return (HoodieRecordPayload<?>) constructor.newInstance(null,
payload.orderingVal);
+ } else {
+ return (HoodieRecordPayload<?>)
this.constructor.newInstance(Option.empty());
+ }
+ }
+}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 41c587c..af87070 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -393,6 +393,56 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
}
@Test
+ void testWriteGlobalIndex() {
+ // the source generates 4 commits
+ String createSource = TestConfigurations.getFileSourceDDL(
+ "source", "test_source_4.data", 4);
+ streamTableEnv.executeSql(createSource);
+
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.INSERT_DROP_DUPS.key(), "true");
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1",
options);
+ streamTableEnv.executeSql(hoodieTableDDL);
+
+ final String insertInto2 = "insert into t1 select * from source";
+
+ execInsertSql(streamTableEnv, insertInto2);
+
+ List<Row> result = CollectionUtil.iterableToList(
+ () -> streamTableEnv.sqlQuery("select * from t1").execute().collect());
+ assertRowsEquals(result, "[id1,Phoebe,52,1970-01-01T00:00:08,par4]");
+ }
+
+ @Test
+ void testWriteLocalIndex() {
+ // the source generates 4 commits
+ String createSource = TestConfigurations.getFileSourceDDL(
+ "source", "test_source_4.data", 4);
+ streamTableEnv.executeSql(createSource);
+
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.INDEX_GLOBAL_ENABLED.key(), "false");
+ options.put(FlinkOptions.INSERT_DROP_DUPS.key(), "true");
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1",
options);
+ streamTableEnv.executeSql(hoodieTableDDL);
+
+ final String insertInto2 = "insert into t1 select * from source";
+
+ execInsertSql(streamTableEnv, insertInto2);
+
+ List<Row> result = CollectionUtil.iterableToList(
+ () -> streamTableEnv.sqlQuery("select * from t1").execute().collect());
+ final String expected = "["
+ + "id1,Stephen,34,1970-01-01T00:00:02,par1, "
+ + "id1,Fabian,32,1970-01-01T00:00:04,par2, "
+ + "id1,Jane,19,1970-01-01T00:00:06,par3, "
+ + "id1,Phoebe,52,1970-01-01T00:00:08,par4]";
+ assertRowsEquals(result, expected, 3);
+ }
+
+ @Test
void testStreamReadEmptyTablePath() throws Exception {
// create an empty table
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 4a2466c..bb67661 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -256,8 +256,20 @@ public class TestData {
* @param expected Expected string of the sorted rows
*/
public static void assertRowsEquals(List<Row> rows, String expected) {
+ assertRowsEquals(rows, expected, 0);
+ }
+
+ /**
+ * Sort the {@code rows} using field at index {@code orderingPos} and asserts
+ * it equals with the expected string {@code expected}.
+ *
+ * @param rows Actual result rows
+ * @param expected Expected string of the sorted rows
+ * @param orderingPos Field position for ordering
+ */
+ public static void assertRowsEquals(List<Row> rows, String expected, int
orderingPos) {
String rowsString = rows.stream()
- .sorted(Comparator.comparing(o -> toStringSafely(o.getField(0))))
+ .sorted(Comparator.comparing(o ->
toStringSafely(o.getField(orderingPos))))
.collect(Collectors.toList()).toString();
assertThat(rowsString, is(expected));
}
diff --git a/hudi-flink/src/test/resources/test_source_4.data
b/hudi-flink/src/test/resources/test_source_4.data
new file mode 100644
index 0000000..1ed4d19
--- /dev/null
+++ b/hudi-flink/src/test/resources/test_source_4.data
@@ -0,0 +1,8 @@
+{"uuid": "id1", "name": "Danny", "age": 24, "ts": "1970-01-01T00:00:01",
"partition": "par1"}
+{"uuid": "id1", "name": "Stephen", "age": 34, "ts": "1970-01-01T00:00:02",
"partition": "par1"}
+{"uuid": "id1", "name": "Julian", "age": 54, "ts": "1970-01-01T00:00:03",
"partition": "par2"}
+{"uuid": "id1", "name": "Fabian", "age": 32, "ts": "1970-01-01T00:00:04",
"partition": "par2"}
+{"uuid": "id1", "name": "Sophia", "age": 18, "ts": "1970-01-01T00:00:05",
"partition": "par3"}
+{"uuid": "id1", "name": "Jane", "age": 19, "ts": "1970-01-01T00:00:06",
"partition": "par3"}
+{"uuid": "id1", "name": "Ella", "age": 38, "ts": "1970-01-01T00:00:07",
"partition": "par4"}
+{"uuid": "id1", "name": "Phoebe", "age": 52, "ts": "1970-01-01T00:00:08",
"partition": "par4"}