This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7c4b68ed0 [core] Add hint in local lookup file tracking with remote
file (#3814)
7c4b68ed0 is described below
commit 7c4b68ed06c441b3c6f7e9df896c4492ae8985e1
Author: Aitozi <[email protected]>
AuthorDate: Wed Jul 31 16:32:20 2024 +0800
[core] Add hint in local lookup file tracking with remote file (#3814)
---
.../paimon/utils/InternalRowPartitionComputer.java | 19 +++++++
.../utils/InternalRowPartitionComputerTest.java | 58 ++++++++++++++++++++++
.../java/org/apache/paimon/KeyValueFileStore.java | 1 +
.../org/apache/paimon/disk/FileChannelManager.java | 3 ++
.../apache/paimon/disk/FileChannelManagerImpl.java | 6 +++
.../java/org/apache/paimon/disk/FileIOChannel.java | 5 ++
.../java/org/apache/paimon/disk/IOManager.java | 2 +
.../java/org/apache/paimon/disk/IOManagerImpl.java | 5 ++
.../org/apache/paimon/mergetree/LookupFile.java | 12 +++++
.../org/apache/paimon/mergetree/LookupLevels.java | 7 ++-
.../paimon/operation/KeyValueFileStoreWrite.java | 20 ++++++--
.../apache/paimon/table/query/LocalTableQuery.java | 17 ++++++-
.../paimon/mergetree/ContainsLevelsTest.java | 2 +-
.../apache/paimon/mergetree/LookupFileTest.java | 49 ++++++++++++++++++
.../apache/paimon/mergetree/LookupLevelsTest.java | 2 +-
.../flink/source/TestChangelogDataReadWrite.java | 3 ++
16 files changed, 200 insertions(+), 11 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
index bdc5197fe..07746d58d 100644
---
a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
@@ -18,6 +18,7 @@
package org.apache.paimon.utils;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.RowType;
@@ -77,4 +78,22 @@ public class InternalRowPartitionComputer {
}
return partValues;
}
+
+ public static String paritionToString(
+ RowType partitionType, BinaryRow partition, String delimiter) {
+ InternalRow.FieldGetter[] getters = partitionType.fieldGetters();
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < getters.length; i++) {
+ Object part = getters[i].getFieldOrNull(partition);
+ if (part != null) {
+ builder.append(part);
+ } else {
+ builder.append("null");
+ }
+ if (i != getters.length - 1) {
+ builder.append(delimiter);
+ }
+ }
+ return builder.toString();
+ }
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/utils/InternalRowPartitionComputerTest.java
b/paimon-common/src/test/java/org/apache/paimon/utils/InternalRowPartitionComputerTest.java
new file mode 100644
index 000000000..fcd79fa4c
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/utils/InternalRowPartitionComputerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link InternalRowPartitionComputer}. */
+public class InternalRowPartitionComputerTest {
+
+ @Test
+ public void testPartitionToString() {
+ RowType rowType = RowType.of();
+ BinaryRow binaryRow = new BinaryRow(0);
+ BinaryRowWriter writer = new BinaryRowWriter(binaryRow);
+ assertThat(InternalRowPartitionComputer.paritionToString(rowType,
binaryRow, "-"))
+ .isEqualTo("");
+
+ rowType = RowType.of(DataTypes.STRING(), DataTypes.INT());
+ binaryRow = new BinaryRow(2);
+ writer = new BinaryRowWriter(binaryRow);
+ writer.writeString(0, BinaryString.fromString("20240731"));
+ writer.writeInt(1, 10);
+ assertThat(InternalRowPartitionComputer.paritionToString(rowType,
binaryRow, "-"))
+ .isEqualTo("20240731-10");
+
+ rowType = RowType.of(DataTypes.STRING(), DataTypes.INT());
+ binaryRow = new BinaryRow(2);
+ writer = new BinaryRowWriter(binaryRow);
+ writer.setNullAt(0);
+ writer.writeInt(1, 10);
+ assertThat(InternalRowPartitionComputer.paritionToString(rowType,
binaryRow, "-"))
+ .isEqualTo("null-10");
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index c17f1c252..26341d045 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -169,6 +169,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
schemaManager,
schema,
commitUser,
+ partitionType,
keyType,
valueType,
keyComparatorSupplier,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManager.java
b/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManager.java
index 1c6a709cf..7349d4566 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManager.java
@@ -26,6 +26,9 @@ public interface FileChannelManager extends AutoCloseable {
/** Creates an ID identifying an underlying file channel and returns it. */
FileIOChannel.ID createChannel();
+ /** Creates an ID identifying an underlying file channel and returns it. */
+ FileIOChannel.ID createChannel(String prefix);
+
/** Creates an enumerator for channels that logically belong together and
returns it. */
FileIOChannel.Enumerator createChannelEnumerator();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManagerImpl.java
b/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManagerImpl.java
index d78a361c9..ce175e90b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManagerImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/disk/FileChannelManagerImpl.java
@@ -89,6 +89,12 @@ public class FileChannelManagerImpl implements
FileChannelManager {
return new ID(paths[num], num, random);
}
+ @Override
+ public ID createChannel(String prefix) {
+ int num = (int) (nextPath.getAndIncrement() % paths.length);
+ return new ID(paths[num], num, prefix, random);
+ }
+
@Override
public Enumerator createChannelEnumerator() {
return new Enumerator(paths, random);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/FileIOChannel.java
b/paimon-core/src/main/java/org/apache/paimon/disk/FileIOChannel.java
index 5c9a262df..4b72029dd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/FileIOChannel.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/FileIOChannel.java
@@ -102,6 +102,11 @@ public interface FileIOChannel {
this.bucketNum = bucketNum;
}
+ public ID(File basePath, int bucketNum, String prefix, Random random) {
+ this.path = new File(basePath, prefix + "-" + randomString(random)
+ ".channel");
+ this.bucketNum = bucketNum;
+ }
+
/** Returns the path to the underlying temporary file. */
public String getPath() {
return path.getAbsolutePath();
diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/IOManager.java
b/paimon-core/src/main/java/org/apache/paimon/disk/IOManager.java
index 8e6b67f83..510492583 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/IOManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/IOManager.java
@@ -34,6 +34,8 @@ public interface IOManager extends AutoCloseable {
ID createChannel();
+ ID createChannel(String prefix);
+
String[] tempDirs();
Enumerator createChannelEnumerator();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
index 50b8621ec..d39c8efb5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
@@ -77,6 +77,11 @@ public class IOManagerImpl implements IOManager {
return fileChannelManager.createChannel();
}
+ @Override
+ public ID createChannel(String prefix) {
+ return fileChannelManager.createChannel(prefix);
+ }
+
@Override
public String[] tempDirs() {
return tempDirs;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
index 429ede810..4a1b04f52 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
@@ -105,4 +105,16 @@ public class LookupFile implements Closeable {
}
}
}
+
+ public static String localFilePrefix(
+ String partition, int bucket, String remoteFileName, int length) {
+ String identifier;
+ if (partition.isEmpty()) {
+ identifier = String.format("%s-%s", bucket, remoteFileName);
+ } else {
+ identifier = String.format("%s-%s-%s", partition, bucket,
remoteFileName);
+ }
+
+ return identifier.substring(0, Math.min(identifier.length(), length));
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
index 6cb3bc61b..da1192a6e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
@@ -47,7 +47,6 @@ import java.util.HashSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
-import java.util.function.Supplier;
import static org.apache.paimon.utils.VarLengthIntUtils.MAX_VAR_LONG_SIZE;
import static org.apache.paimon.utils.VarLengthIntUtils.decodeLong;
@@ -61,7 +60,7 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
private final RowCompactedSerializer keySerializer;
private final ValueProcessor<T> valueProcessor;
private final IOFunction<DataFileMeta, RecordReader<KeyValue>>
fileReaderFactory;
- private final Supplier<File> localFileFactory;
+ private final Function<String, File> localFileFactory;
private final LookupStoreFactory lookupStoreFactory;
private final Function<Long, BloomFilter.Builder> bfGenerator;
@@ -74,7 +73,7 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
RowType keyType,
ValueProcessor<T> valueProcessor,
IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
- Supplier<File> localFileFactory,
+ Function<String, File> localFileFactory,
LookupStoreFactory lookupStoreFactory,
Function<Long, BloomFilter.Builder> bfGenerator,
Cache<String, LookupFile> lookupFileCache) {
@@ -145,7 +144,7 @@ public class LookupLevels<T> implements
Levels.DropFileCallback, Closeable {
}
private LookupFile createLookupFile(DataFileMeta file) throws IOException {
- File localFile = localFileFactory.get();
+ File localFile = localFileFactory.apply(file.fileName());
if (!localFile.createNewFile()) {
throw new IOException("Can not create new file: " + localFile);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 85a796d2e..545f20f09 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -69,6 +69,7 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.UserDefinedSeqComparator;
@@ -104,6 +105,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
private final FileIO fileIO;
private final RowType keyType;
private final RowType valueType;
+ private final RowType partitionType;
@Nullable private final RecordLevelExpire recordLevelExpire;
@Nullable private Cache<String, LookupFile> lookupFileCache;
@@ -112,6 +114,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
SchemaManager schemaManager,
TableSchema schema,
String commitUser,
+ RowType partitionType,
RowType keyType,
RowType valueType,
Supplier<Comparator<InternalRow>> keyComparatorSupplier,
@@ -136,6 +139,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
deletionVectorsMaintainerFactory,
tableName);
this.fileIO = fileIO;
+ this.partitionType = partitionType;
this.keyType = keyType;
this.valueType = valueType;
this.udsComparatorSupplier = udsComparatorSupplier;
@@ -325,7 +329,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
return new LookupMergeTreeCompactRewriter(
maxLevel,
mergeEngine,
- createLookupLevels(levels, processor, lookupReaderFactory),
+ createLookupLevels(partition, bucket, levels, processor,
lookupReaderFactory),
readerFactory,
writerFactory,
keyComparator,
@@ -347,6 +351,8 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
}
private <T> LookupLevels<T> createLookupLevels(
+ BinaryRow partition,
+ int bucket,
Levels levels,
LookupLevels.ValueProcessor<T> valueProcessor,
FileReaderFactory<KeyValue> readerFactory) {
@@ -366,14 +372,22 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
}
-
return new LookupLevels<>(
levels,
keyComparatorSupplier.get(),
keyType,
valueProcessor,
readerFactory::createRecordReader,
- () -> ioManager.createChannel().getPathFile(),
+ file ->
+ ioManager
+ .createChannel(
+ LookupFile.localFilePrefix(
+
InternalRowPartitionComputer.paritionToString(
+ partitionType,
partition, "-"),
+ bucket,
+ file,
+ 100))
+ .getPathFile(),
lookupStoreFactory,
bfGenerator(options),
lookupFileCache);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index f0a32d101..5f5463a70 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -39,6 +39,7 @@ import org.apache.paimon.mergetree.LookupLevels;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.KeyComparatorSupplier;
import org.apache.paimon.utils.Preconditions;
@@ -75,6 +76,8 @@ public class LocalTableQuery implements TableQuery {
@Nullable private Cache<String, LookupFile> lookupFileCache;
+ private final RowType partitionType;
+
public LocalTableQuery(FileStoreTable table) {
this.options = table.coreOptions();
this.tableView = new HashMap<>();
@@ -86,6 +89,7 @@ public class LocalTableQuery implements TableQuery {
KeyValueFileStore store = (KeyValueFileStore) tableStore;
this.readerFactoryBuilder = store.newReaderFactoryBuilder();
+ this.partitionType = table.schema().logicalPartitionType();
RowType keyType = readerFactoryBuilder.keyType();
this.keyComparatorSupplier = new
KeyComparatorSupplier(readerFactoryBuilder.keyType());
this.lookupStoreFactory =
@@ -155,9 +159,18 @@ public class LocalTableQuery implements TableQuery {
file.fileName(),
file.fileSize(),
file.level()),
- () ->
+ file ->
Preconditions.checkNotNull(ioManager,
"IOManager is required.")
- .createChannel()
+ .createChannel(
+ LookupFile.localFilePrefix(
+
InternalRowPartitionComputer
+
.paritionToString(
+
partitionType,
+
partition,
+ "-"),
+ bucket,
+ file,
+ 100))
.getPathFile(),
lookupStoreFactory,
bfGenerator(options),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index 086548547..47a9dd902 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -192,7 +192,7 @@ public class ContainsLevelsTest {
createReaderFactory()
.createRecordReader(
0, file.fileName(), file.fileSize(),
file.level()),
- () -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX +
UUID.randomUUID()),
+ file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX +
UUID.randomUUID()),
new HashLookupStoreFactory(
new CacheManager(MemorySize.ofMebiBytes(1)), 2048,
0.75, "none"),
rowCount -> BloomFilter.builder(rowCount, 0.01),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupFileTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupFileTest.java
new file mode 100644
index 000000000..1fcebb338
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupFileTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link LookupFile}. */
+public class LookupFileTest {
+
+ @Test
+ public void testLocalFilePrefix() {
+ assertThat(
+ LookupFile.localFilePrefix(
+ "2024073105",
+ 10,
+
"data-ccbb95e7-8b8c-4549-8ca9-f553843d67ad-3.orc",
+ 100))
+
.isEqualTo("2024073105-10-data-ccbb95e7-8b8c-4549-8ca9-f553843d67ad-3.orc");
+ assertThat(
+ LookupFile.localFilePrefix(
+ "", 10,
"data-ccbb95e7-8b8c-4549-8ca9-f553843d67ad-3.orc", 100))
+
.isEqualTo("10-data-ccbb95e7-8b8c-4549-8ca9-f553843d67ad-3.orc");
+ assertThat(
+ LookupFile.localFilePrefix(
+ "2024073105",
+ 10,
+
"data-ccbb95e7-8b8c-4549-8ca9-f553843d67ad-3.orc",
+ 20))
+ .isEqualTo("2024073105-10-data-c");
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index 9945b54b9..f9b4bf727 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -271,7 +271,7 @@ public class LookupLevelsTest {
createReaderFactory()
.createRecordReader(
0, file.fileName(), file.fileSize(),
file.level()),
- () -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX +
UUID.randomUUID()),
+ file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX +
UUID.randomUUID()),
new HashLookupStoreFactory(
new CacheManager(MemorySize.ofMebiBytes(1)), 2048,
0.75, "none"),
rowCount -> BloomFilter.builder(rowCount, 0.05),
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index edd6688da..66566f873 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -70,6 +70,8 @@ public class TestChangelogDataReadWrite {
new RowType(singletonList(new DataField(0, "k", new
BigIntType())));
private static final RowType VALUE_TYPE =
new RowType(singletonList(new DataField(0, "v", new
BigIntType())));
+ private static final RowType PARTITION_TYPE =
+ new RowType(singletonList(new DataField(0, "p", new IntType())));
private static final Comparator<InternalRow> COMPARATOR =
Comparator.comparingLong(o -> o.getLong(0));
private static final RecordEqualiser EQUALISER =
@@ -171,6 +173,7 @@ public class TestChangelogDataReadWrite {
schemaManager,
schemaManager.schema(0),
commitUser,
+ PARTITION_TYPE,
KEY_TYPE,
VALUE_TYPE,
() -> COMPARATOR,