This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ed3b5db0d [flink] Fix cache size in GlobalIndexAssigner
ed3b5db0d is described below
commit ed3b5db0d211d2c5533e0822dac4277af725d923
Author: Jingsong <[email protected]>
AuthorDate: Thu Sep 7 21:47:28 2023 +0800
[flink] Fix cache size in GlobalIndexAssigner
---
.../java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java | 4 ++--
.../paimon-flink-common/src/test/resources/log4j2-test.properties | 2 +-
2 files changed, 3 insertions(+), 3 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
index 78340b246..9662a3bb4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
@@ -23,6 +23,7 @@ import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.RowCompactedSerializer;
+import org.apache.paimon.flink.RocksDBOptions;
import org.apache.paimon.flink.lookup.RocksDBStateFactory;
import org.apache.paimon.flink.lookup.RocksDBValueState;
import org.apache.paimon.options.Options;
@@ -106,14 +107,13 @@ public class GlobalIndexAssigner<T> implements
Serializable {
Options options = coreOptions.toConfiguration();
this.path = new File(tmpDir, "lookup-" + UUID.randomUUID());
this.stateFactory = new RocksDBStateFactory(path.toString(), options);
- long cacheSize =
options.get(CoreOptions.LOOKUP_CACHE_MAX_MEMORY_SIZE).getBytes();
RowType keyType = table.schema().logicalTrimmedPrimaryKeysType();
this.keyIndex =
stateFactory.valueState(
"keyIndex",
new RowCompactedSerializer(keyType),
new PositiveIntIntSerializer(),
- cacheSize);
+ options.get(RocksDBOptions.LOOKUP_CACHE_ROWS));
this.partMapping = new IDMapping<>(BinaryRow::copy);
this.bucketAssigner = new BucketAssigner();
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/log4j2-test.properties
b/paimon-flink/paimon-flink-common/src/test/resources/log4j2-test.properties
index 1b3980d15..e27922dad 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/log4j2-test.properties
+++ b/paimon-flink/paimon-flink-common/src/test/resources/log4j2-test.properties
@@ -18,7 +18,7 @@
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
-rootLogger.level = OFF
+rootLogger.level = INFO
rootLogger.appenderRef.test.ref = TestLogger
appender.testlogger.name = TestLogger