This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0a27c4d34 [flink] The dim table in lookup join should ignore scan
options (#2890)
0a27c4d34 is described below
commit 0a27c4d34a4cd029498100f7151a907a4741220b
Author: yuzelin <[email protected]>
AuthorDate: Fri Feb 23 11:20:20 2024 +0800
[flink] The dim table in lookup join should ignore scan options (#2890)
---
.../paimon/table/AbstractFileStoreTable.java | 2 -
.../paimon/table/AppendOnlyFileStoreTable.java | 2 +-
.../org/apache/paimon/table/FileStoreTable.java | 2 +
.../paimon/table/PrimaryKeyFileStoreTable.java | 2 +-
paimon-flink/paimon-flink-common/pom.xml | 12 ++++++
.../paimon/flink/lookup/FullCacheLookupTable.java | 4 +-
...amingReader.java => LookupStreamingReader.java} | 50 +++++++++++++++-------
.../org/apache/paimon/flink/LookupJoinITCase.java | 14 +++++-
8 files changed, 65 insertions(+), 23 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index d00ce913d..6eea9d2ae 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -175,8 +175,6 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
protected abstract BiConsumer<FileStoreScan, Predicate>
nonPartitionFilterConsumer();
- protected abstract FileStoreTable copy(TableSchema newTableSchema);
-
@Override
public FileStoreTable copy(Map<String, String> dynamicOptions) {
checkImmutability(dynamicOptions);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 915ab1544..9c97d406c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -66,7 +66,7 @@ class AppendOnlyFileStoreTable extends AbstractFileStoreTable
{
}
@Override
- protected FileStoreTable copy(TableSchema newTableSchema) {
+ public FileStoreTable copy(TableSchema newTableSchema) {
return new AppendOnlyFileStoreTable(fileIO, path, newTableSchema,
catalogEnvironment);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index ab1b2e961..2134d97ca 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -80,6 +80,8 @@ public interface FileStoreTable extends DataTable {
@Override
FileStoreTable copy(Map<String, String> dynamicOptions);
+ FileStoreTable copy(TableSchema newTableSchema);
+
/** Doesn't change table schema even when there exists time travel scan
options. */
FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index a1614765e..2bacb1373 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -75,7 +75,7 @@ class PrimaryKeyFileStoreTable extends AbstractFileStoreTable
{
}
@Override
- protected FileStoreTable copy(TableSchema newTableSchema) {
+ public FileStoreTable copy(TableSchema newTableSchema) {
return new PrimaryKeyFileStoreTable(fileIO, path, newTableSchema,
catalogEnvironment);
}
diff --git a/paimon-flink/paimon-flink-common/pom.xml
b/paimon-flink/paimon-flink-common/pom.xml
index d486aaa42..76b2dc4ff 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -101,6 +101,18 @@ under the License.
<groupId>org.pentaho</groupId>
<artifactId>*</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-storage-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
</exclusions>
</dependency>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index 19e56c316..d6ccacf87 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -53,7 +53,7 @@ public abstract class FullCacheLookupTable implements
LookupTable {
protected final Context context;
protected final RocksDBStateFactory stateFactory;
protected final RowType projectedType;
- private final TableStreamingReader reader;
+ private final LookupStreamingReader reader;
private final boolean sequenceFieldEnabled;
public FullCacheLookupTable(Context context) throws IOException {
@@ -64,7 +64,7 @@ public abstract class FullCacheLookupTable implements
LookupTable {
context.table.coreOptions().toConfiguration(),
null);
FileStoreTable table = context.table;
- this.reader = new TableStreamingReader(table, context.projection,
context.tablePredicate);
+ this.reader = new LookupStreamingReader(table, context.projection,
context.tablePredicate);
this.sequenceFieldEnabled =
table.primaryKeys().size() > 0
&& new
CoreOptions(table.options()).sequenceField().isPresent();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
similarity index 78%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
index c96eab450..ad6f72670 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
@@ -24,9 +24,12 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.io.SplitsParallelReadUtil;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
+import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.KeyValueTableRead;
import org.apache.paimon.table.source.ReadBuilder;
@@ -43,8 +46,10 @@ import
org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
import javax.annotation.Nullable;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.function.IntUnaryOperator;
import java.util.stream.IntStream;
@@ -52,8 +57,8 @@ import static
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_BOOTSTRAP_PAR
import static
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
import static org.apache.paimon.schema.SystemColumns.SEQUENCE_NUMBER;
-/** A streaming reader to read table. */
-public class TableStreamingReader {
+/** A streaming reader to load data into {@link LookupTable}. */
+public class LookupStreamingReader {
private final Table table;
private final int[] projection;
@@ -61,19 +66,19 @@ public class TableStreamingReader {
@Nullable private final Predicate projectedPredicate;
private final StreamTableScan scan;
- public TableStreamingReader(Table table, int[] projection, @Nullable
Predicate predicate) {
- this.table = table;
- this.projection = projection;
- if (CoreOptions.fromMap(table.options()).startupMode()
- != CoreOptions.StartupMode.COMPACTED_FULL) {
- table =
- table.copy(
- Collections.singletonMap(
- CoreOptions.SCAN_MODE.key(),
-
CoreOptions.StartupMode.LATEST_FULL.toString()));
- }
+ private static final List<ConfigOption<?>> TIME_TRAVEL_OPTIONS =
+ Arrays.asList(
+ CoreOptions.SCAN_TIMESTAMP_MILLIS,
+ CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS,
+ CoreOptions.SCAN_SNAPSHOT_ID,
+ CoreOptions.SCAN_TAG_NAME,
+ CoreOptions.SCAN_VERSION);
- this.readBuilder =
table.newReadBuilder().withProjection(projection).withFilter(predicate);
+ public LookupStreamingReader(Table table, int[] projection, @Nullable
Predicate predicate) {
+ this.table = unsetTimeTravelOptions(table);
+ this.projection = projection;
+ this.readBuilder =
+
this.table.newReadBuilder().withProjection(projection).withFilter(predicate);
scan = readBuilder.newStreamScan();
if (predicate != null) {
@@ -100,6 +105,21 @@ public class TableStreamingReader {
}
}
+ private Table unsetTimeTravelOptions(Table origin) {
+ FileStoreTable fileStoreTable = (FileStoreTable) origin;
+ Map<String, String> newOptions = new
HashMap<>(fileStoreTable.options());
+
TIME_TRAVEL_OPTIONS.stream().map(ConfigOption::key).forEach(newOptions::remove);
+
+ CoreOptions.StartupMode startupMode =
CoreOptions.fromMap(newOptions).startupMode();
+ if (startupMode != CoreOptions.StartupMode.COMPACTED_FULL) {
+ startupMode = CoreOptions.StartupMode.LATEST_FULL;
+ }
+ newOptions.put(CoreOptions.SCAN_MODE.key(), startupMode.toString());
+
+ TableSchema newSchema = fileStoreTable.schema().copy(newOptions);
+ return fileStoreTable.copy(newSchema);
+ }
+
public RecordReader<InternalRow> nextBatch(boolean useParallelism, boolean
readSequenceNumber)
throws Exception {
List<Split> splits = scan.plan().splits();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index c0e6bea9a..5fc9af74b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.params.provider.EnumSource;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
@@ -135,11 +136,20 @@ public class LookupJoinITCase extends CatalogITCaseBase {
@ParameterizedTest
@EnumSource(LookupCacheMode.class)
- public void testLookupWithLatest(LookupCacheMode cacheMode) throws
Exception {
+ public void testLookupIgnoreScanOptions(LookupCacheMode cacheMode) throws
Exception {
initTable(cacheMode);
sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
+
+ String scanOption;
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ scanOption = "'scan.mode'='latest'";
+ } else {
+ scanOption = "'scan.snapshot-id'='2'";
+ }
String query =
- "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+
OPTIONS('scan.mode'='latest') */"
+ "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+ OPTIONS("
+ + scanOption
+ + ") */"
+ " for system_time as of T.proctime AS D ON T.i =
D.i";
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());