This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 50c2b76 Revert "[HUDI-2087] Support Append only in Flink stream
(#3252)"
50c2b76 is described below
commit 50c2b76d725a71608a38217370b1ac45cedae405
Author: yuzhao.cyz <[email protected]>
AuthorDate: Fri Jul 16 18:05:33 2021 +0800
Revert "[HUDI-2087] Support Append only in Flink stream (#3252)"
This reverts commit 783c9cb3
---
.../hudi/table/action/commit/BucketType.java | 2 +-
.../apache/hudi/client/HoodieFlinkWriteClient.java | 10 +--
.../java/org/apache/hudi/io/FlinkAppendHandle.java | 3 +-
.../java/org/apache/hudi/io/FlinkCreateHandle.java | 12 ---
.../commit/BaseFlinkCommitActionExecutor.java | 5 +-
.../apache/hudi/configuration/FlinkOptions.java | 6 --
.../sink/partitioner/BucketAssignFunction.java | 44 +++-------
.../hudi/sink/partitioner/BucketAssigner.java | 10 +--
.../apache/hudi/streamer/FlinkStreamerConfig.java | 11 +--
.../org/apache/hudi/table/HoodieTableFactory.java | 6 --
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 93 ++--------------------
.../org/apache/hudi/sink/TestWriteMergeOnRead.java | 15 ----
.../hudi/sink/TestWriteMergeOnReadWithCompact.java | 20 +----
.../test/java/org/apache/hudi/utils/TestData.java | 33 --------
14 files changed, 28 insertions(+), 242 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java
index e1fd161..70ee473 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java
@@ -19,5 +19,5 @@
package org.apache.hudi.table.action.commit;
public enum BucketType {
- UPDATE, INSERT, APPEND_ONLY
+ UPDATE, INSERT
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 71ca1b6..05e4481 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -56,7 +56,6 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.table.action.compact.FlinkCompactHelpers;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngrade;
import org.apache.hudi.util.FlinkClientUtil;
@@ -409,12 +408,6 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
final HoodieRecordLocation loc = record.getCurrentLocation();
final String fileID = loc.getFileId();
final String partitionPath = record.getPartitionPath();
- // append only mode always use FlinkCreateHandle
- if (loc.getInstantTime().equals(BucketType.APPEND_ONLY.name())) {
- return new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
- fileID, table.getTaskContextSupplier());
- }
-
if (bucketToHandles.containsKey(fileID)) {
MiniBatchHandle lastHandle = (MiniBatchHandle)
bucketToHandles.get(fileID);
if (lastHandle.shouldReplace()) {
@@ -431,8 +424,7 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
if (isDelta) {
writeHandle = new FlinkAppendHandle<>(config, instantTime, table,
partitionPath, fileID, recordItr,
table.getTaskContextSupplier());
- } else if (loc.getInstantTime().equals(BucketType.INSERT.name()) ||
loc.getInstantTime().equals(BucketType.APPEND_ONLY.name())) {
- // use the same handle for insert bucket
+ } else if (loc.getInstantTime().equals("I")) {
writeHandle = new FlinkCreateHandle<>(config, instantTime, table,
partitionPath,
fileID, table.getTaskContextSupplier());
} else {
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
index 41d0666..987f335 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
-import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -81,7 +80,7 @@ public class FlinkAppendHandle<T extends HoodieRecordPayload,
I, K, O>
@Override
protected boolean isUpdateRecord(HoodieRecord<T> hoodieRecord) {
return hoodieRecord.getCurrentLocation() != null
- &&
hoodieRecord.getCurrentLocation().getInstantTime().equals(BucketType.UPDATE.name());
+ && hoodieRecord.getCurrentLocation().getInstantTime().equals("U");
}
@Override
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
index 49d8918..3ff579f 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
@@ -30,7 +30,6 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
-import org.apache.hudi.table.MarkerFiles;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -71,17 +70,6 @@ public class FlinkCreateHandle<T extends
HoodieRecordPayload, I, K, O>
}
}
- @Override
- protected void createMarkerFile(String partitionPath, String dataFileName) {
- // In some rare cases, the task was pulled up again with same write file
name,
- // for e.g, reuse the small log files from last commit instant.
-
- // Just skip the marker file creation if it already exists, the new data
would append to
- // the file directly.
- MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
- markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType());
- }
-
/**
* The flink checkpoints start in sequence and asynchronously, when one
write task finish the checkpoint(A)
* (thus the fs view got the written data files some of which may be
invalid),
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
index 7dcc240..5cfd28b 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
@@ -102,7 +102,9 @@ public abstract class BaseFlinkCommitActionExecutor<T
extends HoodieRecordPayloa
final HoodieRecord<?> record = inputRecords.get(0);
final String partitionPath = record.getPartitionPath();
final String fileId = record.getCurrentLocation().getFileId();
- final BucketType bucketType =
BucketType.valueOf(record.getCurrentLocation().getInstantTime());
+ final BucketType bucketType =
record.getCurrentLocation().getInstantTime().equals("I")
+ ? BucketType.INSERT
+ : BucketType.UPDATE;
handleUpsertPartition(
instantTime,
partitionPath,
@@ -183,7 +185,6 @@ public abstract class BaseFlinkCommitActionExecutor<T
extends HoodieRecordPayloa
} else {
switch (bucketType) {
case INSERT:
- case APPEND_ONLY:
return handleInsert(fileIdHint, recordItr);
case UPDATE:
return handleUpdate(partitionPath, fileIdHint, recordItr);
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index d975502..fc29020 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -187,12 +187,6 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(TABLE_TYPE_COPY_ON_WRITE)
.withDescription("Type of table to write. COPY_ON_WRITE (or)
MERGE_ON_READ");
- public static final ConfigOption<Boolean> APPEND_ONLY_ENABLE = ConfigOptions
- .key("append_only.enable")
- .booleanType()
- .defaultValue(false)
- .withDescription("Whether to write data to new baseFile without
index, only support in COW, default false");
-
public static final ConfigOption<String> OPERATION = ConfigOptions
.key("write.operation")
.stringType()
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 5cc239b..75a3454 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -28,13 +28,11 @@ import
org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.bootstrap.IndexRecord;
import org.apache.hudi.sink.utils.PayloadCreation;
import org.apache.hudi.table.action.commit.BucketInfo;
-import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
@@ -57,9 +55,9 @@ import java.util.Objects;
* it then assigns the bucket with ID using the {@link BucketAssigner}.
*
* <p>All the records are tagged with HoodieRecordLocation, instead of real
instant time,
- * INSERT record uses "INSERT" and UPSERT record uses "UPDATE" as instant
time. There is no need to keep
+ * INSERT record uses "I" and UPSERT record uses "U" as instant time. There is
no need to keep
* the "real" instant time for each record, the bucket ID (partition path &
fileID) actually decides
- * where the record should write to. The "INSERT" and "UPDATE" tags are only
used for downstream to decide whether
+ * where the record should write to. The "I" and "U" tags are only used for
downstream to decide whether
* the data bucket is an INSERT or an UPSERT, we should factor the tags out
when the underneath writer
* supports specifying the bucket type explicitly.
*
@@ -108,18 +106,11 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
*/
private final boolean globalIndex;
- private final boolean appendOnly;
-
public BucketAssignFunction(Configuration conf) {
this.conf = conf;
this.isChangingRecords = WriteOperationType.isChangingRecords(
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
this.globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
- this.appendOnly = conf.getBoolean(FlinkOptions.APPEND_ONLY_ENABLE);
- if (appendOnly) {
-
ValidationUtils.checkArgument(conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.COPY_ON_WRITE.name()),
- "APPEND_ONLY mode only support in COPY_ON_WRITE table");
- }
}
@Override
@@ -179,33 +170,25 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
final String partitionPath = hoodieKey.getPartitionPath();
final HoodieRecordLocation location;
- if (appendOnly) {
- location = getNewRecordLocation(partitionPath);
- this.context.setCurrentKey(recordKey);
- record.setCurrentLocation(location);
- out.collect((O) record);
- return;
- }
-
// Only changing records need looking up the index for the location,
// append only records are always recognized as INSERT.
HoodieRecordGlobalLocation oldLoc = indexState.value();
if (isChangingRecords && oldLoc != null) {
- // Set up the instant time as "UPDATE" to mark the bucket as an update
bucket.
+ // Set up the instant time as "U" to mark the bucket as an update bucket.
if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
if (globalIndex) {
// if partition path changes, emit a delete record for old partition
path,
// then update the index state using location with new partition
path.
HoodieRecord<?> deleteRecord = new HoodieRecord<>(new
HoodieKey(recordKey, oldLoc.getPartitionPath()),
payloadCreation.createDeletePayload((BaseAvroPayload)
record.getData()));
-
deleteRecord.setCurrentLocation(oldLoc.toLocal(BucketType.UPDATE.name()));
+ deleteRecord.setCurrentLocation(oldLoc.toLocal("U"));
deleteRecord.seal();
out.collect((O) deleteRecord);
}
location = getNewRecordLocation(partitionPath);
updateIndexState(partitionPath, location);
} else {
- location = oldLoc.toLocal(BucketType.UPDATE.name());
+ location = oldLoc.toLocal("U");
this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
}
} else {
@@ -220,26 +203,17 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
}
private HoodieRecordLocation getNewRecordLocation(String partitionPath) {
- BucketInfo bucketInfo;
- if (appendOnly) {
- bucketInfo = this.bucketAssigner.addAppendOnly(partitionPath);
- } else {
- bucketInfo = this.bucketAssigner.addInsert(partitionPath);
- }
-
+ final BucketInfo bucketInfo = this.bucketAssigner.addInsert(partitionPath);
final HoodieRecordLocation location;
switch (bucketInfo.getBucketType()) {
case INSERT:
- // This is an insert bucket, use HoodieRecordLocation instant time as
"INSERT".
+ // This is an insert bucket, use HoodieRecordLocation instant time as
"I".
// Downstream operators can then check the instant time to know whether
// a record belongs to an insert bucket.
- location = new HoodieRecordLocation(BucketType.INSERT.name(),
bucketInfo.getFileIdPrefix());
+ location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix());
break;
case UPDATE:
- location = new HoodieRecordLocation(BucketType.UPDATE.name(),
bucketInfo.getFileIdPrefix());
- break;
- case APPEND_ONLY:
- location = new HoodieRecordLocation(BucketType.APPEND_ONLY.name(),
bucketInfo.getFileIdPrefix());
+ location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix());
break;
default:
throw new AssertionError();
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
index 965b557..6d805ce 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
@@ -140,14 +140,6 @@ public class BucketAssigner implements AutoCloseable {
}
// if we have anything more, create new insert buckets, like normal
- return getOrCreateNewFileBucket(partitionPath, BucketType.INSERT);
- }
-
- public BucketInfo addAppendOnly(String partitionPath) {
- return getOrCreateNewFileBucket(partitionPath, BucketType.APPEND_ONLY);
- }
-
- private BucketInfo getOrCreateNewFileBucket(String partitionPath, BucketType
bucketType) {
if (newFileAssignStates.containsKey(partitionPath)) {
NewFileAssignState newFileAssignState =
newFileAssignStates.get(partitionPath);
if (newFileAssignState.canAssign()) {
@@ -156,7 +148,7 @@ public class BucketAssigner implements AutoCloseable {
final String key = StreamerUtil.generateBucketKey(partitionPath,
newFileAssignState.fileId);
return bucketInfoMap.get(key);
}
- BucketInfo bucketInfo = new BucketInfo(bucketType,
FSUtils.createNewFileIdPfx(), partitionPath);
+ BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT,
FSUtils.createNewFileIdPfx(), partitionPath);
final String key = StreamerUtil.generateBucketKey(partitionPath,
bucketInfo.getFileIdPrefix());
bucketInfoMap.put(key, bucketInfo);
newFileAssignStates.put(partitionPath, new
NewFileAssignState(bucketInfo.getFileIdPrefix(),
writeProfile.getRecordsPerBucket()));
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index 8229ffa..92d976f 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -69,9 +69,6 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--table-type"}, description = "Type of table.
COPY_ON_WRITE (or) MERGE_ON_READ.", required = true)
public String tableType;
- @Parameter(names = {"--append-only"}, description = "Write data to new
parquet in every checkpoint. Only support in COPY_ON_WRITE table.", required =
true)
- public Boolean appendOnly = false;
-
@Parameter(names = {"--props"}, description = "Path to properties file on
localfs or dfs, with configurations for "
+ "hoodie client, schema provider, key generator and data source. For
hoodie client props, sane defaults are "
+ "used, but recommend use to provide basic things like metrics
endpoints, hive configs etc. For sources, refer"
@@ -305,13 +302,7 @@ public class FlinkStreamerConfig extends Configuration {
conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
// copy_on_write works same as COPY_ON_WRITE
conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
- conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, config.appendOnly);
- if (config.appendOnly) {
- // append only should use insert operation
- conf.setString(FlinkOptions.OPERATION,
WriteOperationType.INSERT.value());
- } else {
- conf.setString(FlinkOptions.OPERATION, config.operation.value());
- }
+ conf.setString(FlinkOptions.OPERATION, config.operation.value());
conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName);
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes);
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 695fe00..753ced4 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -18,7 +18,6 @@
package org.apache.hudi.table;
-import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
@@ -145,11 +144,6 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
TableSchema schema) {
// table name
conf.setString(FlinkOptions.TABLE_NAME.key(), tableName);
- // append only
- if (conf.getBoolean(FlinkOptions.APPEND_ONLY_ENABLE)) {
- // append only should use insert operation
- conf.setString(FlinkOptions.OPERATION,
WriteOperationType.INSERT.value());
- }
// hoodie key about options
setupHoodieKeyOptions(conf, table);
// compaction options
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 4344f45..e145076 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -23,7 +23,6 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
@@ -45,7 +44,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
-import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -389,12 +387,12 @@ public class TestWriteCopyOnWrite {
@Test
public void testInsertWithMiniBatches() throws Exception {
// reset the config option
- conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0007); // 734 bytes batch
size
+ conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch
size
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(),
conf);
// open the function and ingest data
funcWrapper.openFunction();
- // Each record is 216 bytes. so 4 records expect to trigger a mini-batch
write
+ // Each record is 208 bytes. so 4 records expect to trigger a mini-batch
write
for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
funcWrapper.invoke(rowData);
}
@@ -450,13 +448,13 @@ public class TestWriteCopyOnWrite {
@Test
public void testInsertWithDeduplication() throws Exception {
// reset the config option
- conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0007); // 734 bytes batch
size
+ conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch
size
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true);
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(),
conf);
// open the function and ingest data
funcWrapper.openFunction();
- // Each record is 216 bytes. so 4 records expect to trigger a mini-batch
write
+ // Each record is 208 bytes. so 4 records expect to trigger a mini-batch
write
for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
funcWrapper.invoke(rowData);
}
@@ -512,91 +510,14 @@ public class TestWriteCopyOnWrite {
}
@Test
- public void testAppendOnly() throws Exception {
- // reset the config option
- conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0007); // 734 bytes batch
size
- conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, false);
- conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, true);
- conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
- funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(),
conf);
-
- // open the function and ingest data
- funcWrapper.openFunction();
- // Each record is 216 bytes. so 4 records expect to trigger a mini-batch
write
- for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
- funcWrapper.invoke(rowData);
- }
-
- // this triggers the data write and event send
- funcWrapper.checkpointFunction(1);
- Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
- assertThat("All data should be flushed out", dataBuffer.size(), is(0));
-
- final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the
first event first
- final OperatorEvent event2 = funcWrapper.getNextEvent();
- assertThat("The operator expect to send an event", event2,
instanceOf(WriteMetadataEvent.class));
-
- funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
- funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
- assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the
event");
-
- String instant = funcWrapper.getWriteClient()
- .getLastPendingInstant(getTableType());
-
- funcWrapper.checkpointComplete(1);
-
- Map<String, List<String>> expected = new HashMap<>();
-
- expected.put("par1", Arrays.asList(
- "id1,par1,id1,Danny,23,0,par1",
- "id1,par1,id1,Danny,23,1,par1",
- "id1,par1,id1,Danny,23,2,par1",
- "id1,par1,id1,Danny,23,3,par1",
- "id1,par1,id1,Danny,23,4,par1"));
-
- TestData.checkWrittenAllData(tempFile, expected, 1);
-
- // started a new instant already
- checkInflightInstant(funcWrapper.getWriteClient());
- checkInstantState(funcWrapper.getWriteClient(),
HoodieInstant.State.COMPLETED, instant);
-
- // insert duplicates again
- for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
- funcWrapper.invoke(rowData);
- }
-
- funcWrapper.checkpointFunction(2);
-
- final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the
first event first
- final OperatorEvent event4 = funcWrapper.getNextEvent();
- funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
- funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
- funcWrapper.checkpointComplete(2);
-
- // Same the original base file content.
- expected.put("par1", Arrays.asList(
- "id1,par1,id1,Danny,23,0,par1",
- "id1,par1,id1,Danny,23,0,par1",
- "id1,par1,id1,Danny,23,1,par1",
- "id1,par1,id1,Danny,23,1,par1",
- "id1,par1,id1,Danny,23,2,par1",
- "id1,par1,id1,Danny,23,2,par1",
- "id1,par1,id1,Danny,23,3,par1",
- "id1,par1,id1,Danny,23,3,par1",
- "id1,par1,id1,Danny,23,4,par1",
- "id1,par1,id1,Danny,23,4,par1"));
- TestData.checkWrittenAllData(tempFile, expected, 1);
- }
-
- @Test
public void testInsertWithSmallBufferSize() throws Exception {
// reset the config option
- conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0007); // 734 bytes
buffer size
+ conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes
buffer size
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(),
conf);
// open the function and ingest data
funcWrapper.openFunction();
- // each record is 216 bytes. so 4 records expect to trigger buffer flush:
+ // each record is 208 bytes. so 4 records expect to trigger buffer flush:
// flush the max size bucket once at a time.
for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
funcWrapper.invoke(rowData);
@@ -739,7 +660,7 @@ public class TestWriteCopyOnWrite {
public void testWriteExactlyOnce() throws Exception {
// reset the config option
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 3);
- conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0007); // 734 bytes
buffer size
+ conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes
buffer size
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(),
conf);
// open the function and ingest data
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index b983e8c..07e23b5 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -23,14 +23,12 @@ import
org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestData;
@@ -39,7 +37,6 @@ import org.apache.avro.Schema;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
import java.io.File;
import java.util.Comparator;
@@ -47,8 +44,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
/**
* Test cases for delta stream write.
*/
@@ -91,16 +86,6 @@ public class TestWriteMergeOnRead extends
TestWriteCopyOnWrite {
return EXPECTED1;
}
- @Test
- public void testAppendOnly() throws Exception {
- conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, true);
- conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
- funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(),
conf);
- assertThrows(IllegalArgumentException.class, () -> {
- funcWrapper.openFunction();
- }, "APPEND_ONLY mode only support in COPY_ON_WRITE table");
- }
-
protected Map<String, String> getMiniBatchExpected() {
Map<String, String> expected = new HashMap<>();
// MOR mode merges the messages with the same key.
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index bd6d3e3..13a71ec 100644
---
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -19,18 +19,15 @@
package org.apache.hudi.sink;
import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
/**
* Test cases for delta stream write with compaction.
*/
@@ -42,19 +39,10 @@ public class TestWriteMergeOnReadWithCompact extends
TestWriteCopyOnWrite {
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
}
- @Override
- protected Map<String, String> getExpectedBeforeCheckpointComplete() {
- return EXPECTED1;
- }
-
+ @Disabled
@Test
- public void testAppendOnly() throws Exception {
- conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, true);
- conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
- funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(),
conf);
- assertThrows(IllegalArgumentException.class, () -> {
- funcWrapper.openFunction();
- }, "APPEND_ONLY mode only support in COPY_ON_WRITE table");
+ public void testIndexStateBootstrap() {
+ // Ignore the index bootstrap because we only support parquet load now.
}
protected Map<String, String> getMiniBatchExpected() {
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 2de4859..50ecf54 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -360,10 +360,8 @@ public class TestData {
assert baseFile.isDirectory();
FileFilter filter = file -> !file.getName().startsWith(".");
File[] partitionDirs = baseFile.listFiles(filter);
-
assertNotNull(partitionDirs);
assertThat(partitionDirs.length, is(partitions));
-
for (File partitionDir : partitionDirs) {
File[] dataFiles = partitionDir.listFiles(filter);
assertNotNull(dataFiles);
@@ -383,37 +381,6 @@ public class TestData {
}
}
- public static void checkWrittenAllData(
- File baseFile,
- Map<String, List<String>> expected,
- int partitions) throws IOException {
- assert baseFile.isDirectory();
- FileFilter filter = file -> !file.getName().startsWith(".");
- File[] partitionDirs = baseFile.listFiles(filter);
-
- assertNotNull(partitionDirs);
- assertThat(partitionDirs.length, is(partitions));
-
- for (File partitionDir : partitionDirs) {
- File[] dataFiles = partitionDir.listFiles(filter);
- assertNotNull(dataFiles);
-
- List<String> readBuffer = new ArrayList<>();
- for (File dataFile : dataFiles) {
- ParquetReader<GenericRecord> reader = AvroParquetReader
- .<GenericRecord>builder(new
Path(dataFile.getAbsolutePath())).build();
- GenericRecord nextRecord = reader.read();
- while (nextRecord != null) {
- readBuffer.add(filterOutVariables(nextRecord));
- nextRecord = reader.read();
- }
- readBuffer.sort(Comparator.naturalOrder());
- }
-
- assertThat(readBuffer, is(expected.get(partitionDir.getName())));
- }
- }
-
/**
* Checks the source data are written as expected.
*