This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b3fab720d [core] Introduce Index BulkLoading for cross partition
update (#2154)
b3fab720d is described below
commit b3fab720d2d524fc0255472a3552c637ecbeb09c
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Oct 20 11:13:28 2023 +0800
[core] Introduce Index BulkLoading for cross partition update (#2154)
---
.../paimon/sort/BinaryInMemorySortBuffer.java | 4 +-
.../org/apache/paimon/table/TableTestBase.java | 2 +-
.../apache/paimon/flink/lookup/RocksDBState.java | 6 +-
.../paimon/flink/lookup/RocksDBStateFactory.java | 53 +++++++-
.../paimon/flink/lookup/RocksDBValueState.java | 4 +-
.../flink/sink/index/GlobalIndexAssigner.java | 143 +++++++++++++++++----
.../sink/index/GlobalIndexAssignerOperator.java | 15 ++-
.../flink/GlobalDynamicBucketTableITCase.java | 1 +
.../flink/sink/index/GlobalIndexAssignerTest.java | 69 ++++++----
9 files changed, 234 insertions(+), 63 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryInMemorySortBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryInMemorySortBuffer.java
index 777effe55..2fe3ebdba 100644
---
a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryInMemorySortBuffer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryInMemorySortBuffer.java
@@ -244,7 +244,9 @@ public class BinaryInMemorySortBuffer extends
BinaryIndexedSortable implements S
@Override
public final MutableObjectIterator<BinaryRow> sortedIterator() {
- new QuickSort().sort(this);
+ if (numRecords > 0) {
+ new QuickSort().sort(this);
+ }
return iterator();
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
index 0e597b815..5465d5c4b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
@@ -71,7 +71,7 @@ public abstract class TableTestBase {
protected Path warehouse;
protected Catalog catalog;
protected String database;
- @TempDir java.nio.file.Path tempPath;
+ @TempDir public java.nio.file.Path tempPath;
@BeforeEach
public void beforeEach() throws Catalog.DatabaseAlreadyExistException {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
index 784b61301..3632069d7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
@@ -77,7 +77,11 @@ public abstract class RocksDBState<K, V, CacheV> {
.build();
}
- protected byte[] serializeKey(K key) throws IOException {
+ public ColumnFamilyHandle columnFamily() {
+ return columnFamily;
+ }
+
+ public byte[] serializeKey(K key) throws IOException {
keyOutView.clear();
keySerializer.serialize(key, keyOutView);
return keyOutView.getCopyOfBuffer();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
index 9c930f464..757d08776 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
@@ -21,31 +21,41 @@ package org.apache.paimon.flink.lookup;
import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.flink.RocksDBOptions;
+import org.apache.flink.table.runtime.util.KeyValueIterator;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
+import org.rocksdb.EnvOptions;
+import org.rocksdb.IngestExternalFileOptions;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
+import org.rocksdb.SstFileWriter;
import org.rocksdb.TtlDB;
import javax.annotation.Nullable;
import java.io.Closeable;
+import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
/** Factory to create state. */
public class RocksDBStateFactory implements Closeable {
public static final String MERGE_OPERATOR_NAME = "stringappendtest";
- private RocksDB db;
-
+ private final Options options;
+ private final String path;
private final ColumnFamilyOptions columnFamilyOptions;
+ private RocksDB db;
+ private int sstIndex = 0;
+
public RocksDBStateFactory(
String path, org.apache.paimon.options.Options conf, @Nullable
Duration ttlSecs)
throws IOException {
@@ -56,11 +66,12 @@ public class RocksDBStateFactory implements Closeable {
.setStatsDumpPeriodSec(0)
.setCreateIfMissing(true),
conf);
+ this.path = path;
this.columnFamilyOptions =
RocksDBOptions.createColumnOptions(new ColumnFamilyOptions(),
conf)
.setMergeOperatorName(MERGE_OPERATOR_NAME);
- Options options = new Options(dbOptions, columnFamilyOptions);
+ this.options = new Options(dbOptions, columnFamilyOptions);
try {
this.db =
ttlSecs == null
@@ -71,6 +82,42 @@ public class RocksDBStateFactory implements Closeable {
}
}
+ public void bulkLoad(ColumnFamilyHandle columnFamily,
KeyValueIterator<byte[], byte[]> iterator)
+ throws RocksDBException, IOException {
+ long targetFileSize = options.targetFileSizeBase();
+
+ List<String> files = new ArrayList<>();
+ SstFileWriter writer = null;
+ long recordNum = 0;
+ while (iterator.advanceNext()) {
+ byte[] key = iterator.getKey();
+ byte[] value = iterator.getValue();
+
+ if (writer == null) {
+ writer = new SstFileWriter(new EnvOptions(), options);
+ String path = new File(this.path, "sst-" +
(sstIndex++)).getPath();
+ writer.open(path);
+ files.add(path);
+ }
+
+ writer.put(key, value);
+ recordNum++;
+ if (recordNum % 1000 == 0 && writer.fileSize() >= targetFileSize) {
+ writer.finish();
+ writer = null;
+ recordNum = 0;
+ }
+ }
+
+ if (writer != null) {
+ writer.finish();
+ }
+
+ if (files.size() > 0) {
+ db.ingestExternalFile(columnFamily, files, new
IngestExternalFileOptions());
+ }
+ }
+
public <K, V> RocksDBValueState<K, V> valueState(
String name,
Serializer<K> keySerializer,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBValueState.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBValueState.java
index 077906403..a12855d88 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBValueState.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBValueState.java
@@ -88,12 +88,12 @@ public class RocksDBValueState<K, V> extends
RocksDBState<K, V, RocksDBState.Ref
}
}
- private V deserializeValue(byte[] valueBytes) throws IOException {
+ public V deserializeValue(byte[] valueBytes) throws IOException {
valueInputView.setBuffer(valueBytes);
return valueSerializer.deserialize(valueInputView);
}
- private byte[] serializeValue(V value) throws IOException {
+ public byte[] serializeValue(V value) throws IOException {
valueOutputView.clear();
valueSerializer.serialize(value, valueOutputView);
return valueOutputView.getCopyOfBuffer();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
index d35ef6ed4..545b0cbd3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
@@ -20,41 +20,63 @@ package org.apache.paimon.flink.sink.index;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.MergeEngine;
+import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.RowCompactedSerializer;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.RocksDBOptions;
import org.apache.paimon.flink.lookup.RocksDBStateFactory;
import org.apache.paimon.flink.lookup.RocksDBValueState;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.sort.BinaryExternalSortBuffer;
+import org.apache.paimon.sort.BinaryInMemorySortBuffer;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.PartitionKeyExtractor;
+import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.IDMapping;
+import org.apache.paimon.utils.MutableObjectIterator;
import org.apache.paimon.utils.PositiveIntInt;
import org.apache.paimon.utils.PositiveIntIntSerializer;
import org.apache.paimon.utils.SerBiFunction;
import org.apache.paimon.utils.SerializableFunction;
+import org.apache.flink.table.runtime.util.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.BiConsumer;
+import static org.apache.paimon.codegen.CodeGenUtils.newNormalizedKeyComputer;
+import static org.apache.paimon.codegen.CodeGenUtils.newRecordComparator;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
/** Assign UPDATE_BEFORE and bucket for the input record, output record with
bucket. */
-public class GlobalIndexAssigner<T> implements Serializable {
+public class GlobalIndexAssigner<T> implements Serializable, Closeable {
private static final long serialVersionUID = 1L;
+ private static final String INDEX_NAME = "keyIndex";
+
private final AbstractFileStoreTable table;
private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>>
extractorFunction;
private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>>
@@ -63,6 +85,9 @@ public class GlobalIndexAssigner<T> implements Serializable {
private final SerBiFunction<T, BinaryRow, T> setPartition;
private final SerBiFunction<T, RowKind, T> setRowKind;
+ private transient boolean bootstrap;
+ private transient BinaryExternalSortBuffer bootstrapBuffer;
+
private transient int targetBucketRowNumber;
private transient int assignId;
private transient BiConsumer<T, Integer> collector;
@@ -92,7 +117,14 @@ public class GlobalIndexAssigner<T> implements Serializable
{
this.setRowKind = setRowKind;
}
- public void open(File tmpDir, int numAssigners, int assignId,
BiConsumer<T, Integer> collector)
+ // ================== Start Public API ===================
+
+ public void open(
+ IOManager ioManager,
+ File tmpDir,
+ int numAssigners,
+ int assignId,
+ BiConsumer<T, Integer> collector)
throws Exception {
this.numAssigners = numAssigners;
this.assignId = assignId;
@@ -113,7 +145,7 @@ public class GlobalIndexAssigner<T> implements Serializable
{
RowType keyType = table.schema().logicalTrimmedPrimaryKeysType();
this.keyIndex =
stateFactory.valueState(
- "keyIndex",
+ INDEX_NAME,
new RowCompactedSerializer(keyType),
new PositiveIntIntSerializer(),
options.get(RocksDBOptions.LOOKUP_CACHE_ROWS));
@@ -121,9 +153,81 @@ public class GlobalIndexAssigner<T> implements
Serializable {
this.partMapping = new IDMapping<>(BinaryRow::copy);
this.bucketAssigner = new BucketAssigner();
this.existsAction = fromMergeEngine(coreOptions.mergeEngine());
+
+ // create bootstrap sort buffer
+ this.bootstrap = true;
+ int pageSize = coreOptions.pageSize();
+ long bufferSize = coreOptions.writeBufferSize() / 2;
+ RecordComparator comparator =
+ newRecordComparator(
+ Collections.singletonList(DataTypes.BYTES()),
"binary_comparator");
+ BinaryInMemorySortBuffer sortBuffer =
+ BinaryInMemorySortBuffer.createBuffer(
+ newNormalizedKeyComputer(
+ Collections.singletonList(DataTypes.BYTES()),
+ "binary_normalized_key"),
+ new InternalRowSerializer(DataTypes.BYTES(),
DataTypes.BYTES()),
+ comparator,
+ new HeapMemorySegmentPool(bufferSize, pageSize));
+ this.bootstrapBuffer =
+ new BinaryExternalSortBuffer(
+ new BinaryRowSerializer(2),
+ comparator,
+ pageSize,
+ sortBuffer,
+ ioManager,
+ coreOptions.localSortMaxNumFileHandles());
}
- public void process(T value) throws Exception {
+ public void bootstrap(T value) throws IOException {
+ checkArgument(bootstrap);
+ BinaryRow partition = keyPartExtractor.partition(value);
+ BinaryRow key = keyPartExtractor.trimmedPrimaryKey(value);
+ int partId = partMapping.index(partition);
+ int bucket = bootstrapBucketFunction.apply(value);
+ bucketAssigner.bootstrapBucket(partition, bucket);
+ PositiveIntInt partAndBucket = new PositiveIntInt(partId, bucket);
+ bootstrapBuffer.write(
+ GenericRow.of(keyIndex.serializeKey(key),
keyIndex.serializeValue(partAndBucket)));
+ }
+
+ public void endBoostrap() throws IOException, RocksDBException {
+ bootstrap = false;
+ MutableObjectIterator<BinaryRow> iterator =
bootstrapBuffer.sortedIterator();
+ BinaryRow row = new BinaryRow(2);
+ KeyValueIterator<byte[], byte[]> kvIter =
+ new KeyValueIterator<byte[], byte[]>() {
+
+ private BinaryRow current;
+
+ @Override
+ public boolean advanceNext() {
+ try {
+ current = iterator.next(row);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ return current != null;
+ }
+
+ @Override
+ public byte[] getKey() {
+ return current.getBinary(0);
+ }
+
+ @Override
+ public byte[] getValue() {
+ return current.getBinary(1);
+ }
+ };
+
+ stateFactory.bulkLoad(keyIndex.columnFamily(), kvIter);
+ bootstrapBuffer.clear();
+ bootstrapBuffer = null;
+ }
+
+ public void processInput(T value) throws Exception {
+ checkArgument(!bootstrap);
BinaryRow partition = extractor.partition(value);
BinaryRow key = extractor.trimmedPrimaryKey(value);
@@ -168,16 +272,20 @@ public class GlobalIndexAssigner<T> implements
Serializable {
}
}
- public void bootstrap(T value) throws IOException {
- BinaryRow partition = keyPartExtractor.partition(value);
- BinaryRow key = keyPartExtractor.trimmedPrimaryKey(value);
- int partId = partMapping.index(partition);
- int bucket = bootstrapBucketFunction.apply(value);
- bucketAssigner.bootstrapBucket(partition, bucket);
- PositiveIntInt partAndBucket = new PositiveIntInt(partId, bucket);
- keyIndex.put(key, partAndBucket);
+ @Override
+ public void close() throws IOException {
+ if (stateFactory != null) {
+ stateFactory.close();
+ stateFactory = null;
+ }
+
+ if (path != null) {
+ FileIOUtils.deleteDirectoryQuietly(path);
+ }
}
+ // ================== End Public API ===================
+
private void processNewRecord(BinaryRow partition, int partId, BinaryRow
key, T value)
throws IOException {
int bucket = assignBucket(partition);
@@ -201,17 +309,6 @@ public class GlobalIndexAssigner<T> implements
Serializable {
collector.accept(value, bucket);
}
- public void close() throws IOException {
- if (stateFactory != null) {
- stateFactory.close();
- stateFactory = null;
- }
-
- if (path != null) {
- FileIOUtils.deleteDirectoryQuietly(path);
- }
- }
-
private static class BucketAssigner {
private final Map<BinaryRow, TreeMap<Integer, Integer>> stats = new
HashMap<>();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
index 8e0e08799..c04eb13cf 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
@@ -71,21 +71,23 @@ public class GlobalIndexAssignerOperator<T> extends
AbstractStreamOperator<Tuple
@Override
public void initializeState(StateInitializationContext context) throws
Exception {
super.initializeState(context);
- org.apache.flink.runtime.io.disk.iomanager.IOManager ioManager =
+ org.apache.flink.runtime.io.disk.iomanager.IOManager flinkIoManager =
getContainingTask().getEnvironment().getIOManager();
- File[] tmpDirs = ioManager.getSpillingDirectories();
+ File[] tmpDirs = flinkIoManager.getSpillingDirectories();
File tmpDir =
tmpDirs[ThreadLocalRandom.current().nextInt(tmpDirs.length)];
+ IOManager ioManager =
IOManager.create(flinkIoManager.getSpillingDirectoriesPaths());
assigner.open(
+ ioManager,
tmpDir,
getRuntimeContext().getNumberOfParallelSubtasks(),
getRuntimeContext().getIndexOfThisSubtask(),
this::collect);
Options options = Options.fromMap(table.options());
- long bufferSize =
options.get(CoreOptions.WRITE_BUFFER_SIZE).getBytes();
+ long bufferSize =
options.get(CoreOptions.WRITE_BUFFER_SIZE).getBytes() / 2;
long pageSize = options.get(CoreOptions.PAGE_SIZE).getBytes();
bootstrapBuffer =
RowBuffer.getBuffer(
-
IOManager.create(ioManager.getSpillingDirectoriesPaths()),
+ ioManager,
new HeapMemorySegmentPool(bufferSize, (int) pageSize),
new InternalRowSerializer(table.rowType()),
true);
@@ -106,7 +108,7 @@ public class GlobalIndexAssignerOperator<T> extends
AbstractStreamOperator<Tuple
// is always true
bootstrapBuffer.put(toRow.apply(value));
} else {
- assigner.process(value);
+ assigner.processInput(value);
}
break;
}
@@ -124,10 +126,11 @@ public class GlobalIndexAssignerOperator<T> extends
AbstractStreamOperator<Tuple
private void endBootstrap() throws Exception {
if (bootstrapBuffer != null) {
+ assigner.endBoostrap();
bootstrapBuffer.complete();
try (RowBuffer.RowBufferIterator iterator =
bootstrapBuffer.newIterator()) {
while (iterator.advanceNext()) {
- assigner.process(fromRow.apply(iterator.getRow()));
+ assigner.processInput(fromRow.apply(iterator.getRow()));
}
}
bootstrapBuffer.reset();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
index 794cb312f..7d5d70b48 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
@@ -90,6 +90,7 @@ public class GlobalDynamicBucketTableITCase extends
CatalogITCaseBase {
sql(
"create table large_t (pt int, k int, v int, primary key (k)
not enforced) partitioned by (pt) with ("
+ "'bucket'='-1', "
+ + "'rocksdb.compaction.level.target-file-size-base'='2
kb', "
+ "'dynamic-bucket.target-row-num'='10000')");
sql(
"create temporary table src (pt int, k int, v int) with ("
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
index 97503079b..4d12f1e93 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
@@ -23,6 +23,7 @@ import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.TableTestBase;
@@ -85,33 +86,43 @@ public class GlobalIndexAssignerTest extends TableTestBase {
innerTestBucketAssign(true);
}
+ private IOManager ioManager() {
+ return IOManager.create(new File(tempPath.toFile(), "io").getPath());
+ }
+
private void innerTestBucketAssign(boolean enableTtl) throws Exception {
GlobalIndexAssigner<InternalRow> assigner =
createAssigner(MergeEngine.DEDUPLICATE, enableTtl);
List<Integer> output = new ArrayList<>();
- assigner.open(new File(warehouse.getPath()), 2, 0, (row, bucket) ->
output.add(bucket));
+ assigner.open(
+ ioManager(),
+ new File(warehouse.getPath()),
+ 2,
+ 0,
+ (row, bucket) -> output.add(bucket));
+ assigner.endBoostrap();
// assign
- assigner.process(GenericRow.of(1, 1, 1));
- assigner.process(GenericRow.of(1, 2, 2));
- assigner.process(GenericRow.of(1, 3, 3));
+ assigner.processInput(GenericRow.of(1, 1, 1));
+ assigner.processInput(GenericRow.of(1, 2, 2));
+ assigner.processInput(GenericRow.of(1, 3, 3));
assertThat(output).containsExactly(0, 0, 0);
output.clear();
// full
- assigner.process(GenericRow.of(1, 4, 4));
+ assigner.processInput(GenericRow.of(1, 4, 4));
assertThat(output).containsExactly(2);
output.clear();
// another partition
- assigner.process(GenericRow.of(2, 5, 5));
+ assigner.processInput(GenericRow.of(2, 5, 5));
assertThat(output).containsExactly(0);
output.clear();
// read assigned
- assigner.process(GenericRow.of(1, 4, 4));
- assigner.process(GenericRow.of(1, 2, 2));
- assigner.process(GenericRow.of(1, 3, 3));
+ assigner.processInput(GenericRow.of(1, 4, 4));
+ assigner.processInput(GenericRow.of(1, 2, 2));
+ assigner.processInput(GenericRow.of(1, 3, 3));
assertThat(output).containsExactly(2, 0, 0);
output.clear();
@@ -123,14 +134,16 @@ public class GlobalIndexAssignerTest extends
TableTestBase {
GlobalIndexAssigner<InternalRow> assigner =
createAssigner(MergeEngine.DEDUPLICATE);
List<Tuple2<InternalRow, Integer>> output = new ArrayList<>();
assigner.open(
+ ioManager(),
new File(warehouse.getPath()),
2,
0,
(row, bucket) -> output.add(new Tuple2<>(row, bucket)));
+ assigner.endBoostrap();
// change partition
- assigner.process(GenericRow.of(1, 1, 1));
- assigner.process(GenericRow.of(2, 1, 2));
+ assigner.processInput(GenericRow.of(1, 1, 1));
+ assigner.processInput(GenericRow.of(2, 1, 2));
assertThat(output)
.containsExactly(
new Tuple2<>(GenericRow.of(1, 1, 1), 0),
@@ -139,14 +152,14 @@ public class GlobalIndexAssignerTest extends
TableTestBase {
output.clear();
// test partition 1 deleted
- assigner.process(GenericRow.of(1, 2, 2));
- assigner.process(GenericRow.of(1, 3, 3));
- assigner.process(GenericRow.of(1, 4, 4));
+ assigner.processInput(GenericRow.of(1, 2, 2));
+ assigner.processInput(GenericRow.of(1, 3, 3));
+ assigner.processInput(GenericRow.of(1, 4, 4));
assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0);
output.clear();
// move from full bucket
- assigner.process(GenericRow.of(2, 4, 4));
+ assigner.processInput(GenericRow.of(2, 4, 4));
assertThat(output)
.containsExactly(
new Tuple2<>(GenericRow.ofKind(RowKind.DELETE, 1, 4,
4), 0),
@@ -154,7 +167,7 @@ public class GlobalIndexAssignerTest extends TableTestBase {
output.clear();
// test partition 1 deleted
- assigner.process(GenericRow.of(1, 5, 5));
+ assigner.processInput(GenericRow.of(1, 5, 5));
assertThat(output.stream().map(t -> t.f1)).containsExactly(0);
output.clear();
@@ -170,14 +183,16 @@ public class GlobalIndexAssignerTest extends
TableTestBase {
GlobalIndexAssigner<InternalRow> assigner =
createAssigner(mergeEngine);
List<Tuple2<InternalRow, Integer>> output = new ArrayList<>();
assigner.open(
+ ioManager(),
new File(warehouse.getPath()),
2,
0,
(row, bucket) -> output.add(new Tuple2<>(row, bucket)));
+ assigner.endBoostrap();
// change partition
- assigner.process(GenericRow.of(1, 1, 1));
- assigner.process(GenericRow.of(2, 1, 2));
+ assigner.processInput(GenericRow.of(1, 1, 1));
+ assigner.processInput(GenericRow.of(2, 1, 2));
assertThat(output)
.containsExactly(
new Tuple2<>(GenericRow.of(1, 1, 1), 0),
@@ -185,9 +200,9 @@ public class GlobalIndexAssignerTest extends TableTestBase {
output.clear();
// test partition 2 no effect
- assigner.process(GenericRow.of(2, 2, 2));
- assigner.process(GenericRow.of(2, 3, 3));
- assigner.process(GenericRow.of(2, 4, 4));
+ assigner.processInput(GenericRow.of(2, 2, 2));
+ assigner.processInput(GenericRow.of(2, 3, 3));
+ assigner.processInput(GenericRow.of(2, 4, 4));
assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0);
output.clear();
assigner.close();
@@ -198,21 +213,23 @@ public class GlobalIndexAssignerTest extends
TableTestBase {
GlobalIndexAssigner<InternalRow> assigner =
createAssigner(MergeEngine.FIRST_ROW);
List<Tuple2<InternalRow, Integer>> output = new ArrayList<>();
assigner.open(
+ ioManager(),
new File(warehouse.getPath()),
2,
0,
(row, bucket) -> output.add(new Tuple2<>(row, bucket)));
+ assigner.endBoostrap();
// change partition
- assigner.process(GenericRow.of(1, 1, 1));
- assigner.process(GenericRow.of(2, 1, 2));
+ assigner.processInput(GenericRow.of(1, 1, 1));
+ assigner.processInput(GenericRow.of(2, 1, 2));
assertThat(output).containsExactly(new Tuple2<>(GenericRow.of(1, 1,
1), 0));
output.clear();
// test partition 2 no effect
- assigner.process(GenericRow.of(2, 2, 2));
- assigner.process(GenericRow.of(2, 3, 3));
- assigner.process(GenericRow.of(2, 4, 4));
+ assigner.processInput(GenericRow.of(2, 2, 2));
+ assigner.processInput(GenericRow.of(2, 3, 3));
+ assigner.processInput(GenericRow.of(2, 4, 4));
assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0);
output.clear();
assigner.close();