This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a27c4d34 [flink] The dim table in lookup join should ignore scan 
options (#2890)
0a27c4d34 is described below

commit 0a27c4d34a4cd029498100f7151a907a4741220b
Author: yuzelin <[email protected]>
AuthorDate: Fri Feb 23 11:20:20 2024 +0800

    [flink] The dim table in lookup join should ignore scan options (#2890)
---
 .../paimon/table/AbstractFileStoreTable.java       |  2 -
 .../paimon/table/AppendOnlyFileStoreTable.java     |  2 +-
 .../org/apache/paimon/table/FileStoreTable.java    |  2 +
 .../paimon/table/PrimaryKeyFileStoreTable.java     |  2 +-
 paimon-flink/paimon-flink-common/pom.xml           | 12 ++++++
 .../paimon/flink/lookup/FullCacheLookupTable.java  |  4 +-
 ...amingReader.java => LookupStreamingReader.java} | 50 +++++++++++++++-------
 .../org/apache/paimon/flink/LookupJoinITCase.java  | 14 +++++-
 8 files changed, 65 insertions(+), 23 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index d00ce913d..6eea9d2ae 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -175,8 +175,6 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
 
     protected abstract BiConsumer<FileStoreScan, Predicate> 
nonPartitionFilterConsumer();
 
-    protected abstract FileStoreTable copy(TableSchema newTableSchema);
-
     @Override
     public FileStoreTable copy(Map<String, String> dynamicOptions) {
         checkImmutability(dynamicOptions);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 915ab1544..9c97d406c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -66,7 +66,7 @@ class AppendOnlyFileStoreTable extends AbstractFileStoreTable 
{
     }
 
     @Override
-    protected FileStoreTable copy(TableSchema newTableSchema) {
+    public FileStoreTable copy(TableSchema newTableSchema) {
         return new AppendOnlyFileStoreTable(fileIO, path, newTableSchema, 
catalogEnvironment);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index ab1b2e961..2134d97ca 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -80,6 +80,8 @@ public interface FileStoreTable extends DataTable {
     @Override
     FileStoreTable copy(Map<String, String> dynamicOptions);
 
+    FileStoreTable copy(TableSchema newTableSchema);
+
     /** Doesn't change table schema even when there exists time travel scan 
options. */
     FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index a1614765e..2bacb1373 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -75,7 +75,7 @@ class PrimaryKeyFileStoreTable extends AbstractFileStoreTable 
{
     }
 
     @Override
-    protected FileStoreTable copy(TableSchema newTableSchema) {
+    public FileStoreTable copy(TableSchema newTableSchema) {
         return new PrimaryKeyFileStoreTable(fileIO, path, newTableSchema, 
catalogEnvironment);
     }
 
diff --git a/paimon-flink/paimon-flink-common/pom.xml 
b/paimon-flink/paimon-flink-common/pom.xml
index d486aaa42..76b2dc4ff 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -101,6 +101,18 @@ under the License.
                     <groupId>org.pentaho</groupId>
                     <artifactId>*</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-storage-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index 19e56c316..d6ccacf87 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -53,7 +53,7 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
     protected final Context context;
     protected final RocksDBStateFactory stateFactory;
     protected final RowType projectedType;
-    private final TableStreamingReader reader;
+    private final LookupStreamingReader reader;
     private final boolean sequenceFieldEnabled;
 
     public FullCacheLookupTable(Context context) throws IOException {
@@ -64,7 +64,7 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
                         context.table.coreOptions().toConfiguration(),
                         null);
         FileStoreTable table = context.table;
-        this.reader = new TableStreamingReader(table, context.projection, 
context.tablePredicate);
+        this.reader = new LookupStreamingReader(table, context.projection, 
context.tablePredicate);
         this.sequenceFieldEnabled =
                 table.primaryKeys().size() > 0
                         && new 
CoreOptions(table.options()).sequenceField().isPresent();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
similarity index 78%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
index c96eab450..ad6f72670 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
@@ -24,9 +24,12 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.JoinedRow;
 import org.apache.paimon.io.SplitsParallelReadUtil;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
+import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.KeyValueTableRead;
 import org.apache.paimon.table.source.ReadBuilder;
@@ -43,8 +46,10 @@ import 
org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.function.IntUnaryOperator;
 import java.util.stream.IntStream;
 
@@ -52,8 +57,8 @@ import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_BOOTSTRAP_PAR
 import static 
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
 import static org.apache.paimon.schema.SystemColumns.SEQUENCE_NUMBER;
 
-/** A streaming reader to read table. */
-public class TableStreamingReader {
+/** A streaming reader to load data into {@link LookupTable}. */
+public class LookupStreamingReader {
 
     private final Table table;
     private final int[] projection;
@@ -61,19 +66,19 @@ public class TableStreamingReader {
     @Nullable private final Predicate projectedPredicate;
     private final StreamTableScan scan;
 
-    public TableStreamingReader(Table table, int[] projection, @Nullable 
Predicate predicate) {
-        this.table = table;
-        this.projection = projection;
-        if (CoreOptions.fromMap(table.options()).startupMode()
-                != CoreOptions.StartupMode.COMPACTED_FULL) {
-            table =
-                    table.copy(
-                            Collections.singletonMap(
-                                    CoreOptions.SCAN_MODE.key(),
-                                    
CoreOptions.StartupMode.LATEST_FULL.toString()));
-        }
+    private static final List<ConfigOption<?>> TIME_TRAVEL_OPTIONS =
+            Arrays.asList(
+                    CoreOptions.SCAN_TIMESTAMP_MILLIS,
+                    CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS,
+                    CoreOptions.SCAN_SNAPSHOT_ID,
+                    CoreOptions.SCAN_TAG_NAME,
+                    CoreOptions.SCAN_VERSION);
 
-        this.readBuilder = 
table.newReadBuilder().withProjection(projection).withFilter(predicate);
+    public LookupStreamingReader(Table table, int[] projection, @Nullable 
Predicate predicate) {
+        this.table = unsetTimeTravelOptions(table);
+        this.projection = projection;
+        this.readBuilder =
+                
this.table.newReadBuilder().withProjection(projection).withFilter(predicate);
         scan = readBuilder.newStreamScan();
 
         if (predicate != null) {
@@ -100,6 +105,21 @@ public class TableStreamingReader {
         }
     }
 
+    private Table unsetTimeTravelOptions(Table origin) {
+        FileStoreTable fileStoreTable = (FileStoreTable) origin;
+        Map<String, String> newOptions = new 
HashMap<>(fileStoreTable.options());
+        
TIME_TRAVEL_OPTIONS.stream().map(ConfigOption::key).forEach(newOptions::remove);
+
+        CoreOptions.StartupMode startupMode = 
CoreOptions.fromMap(newOptions).startupMode();
+        if (startupMode != CoreOptions.StartupMode.COMPACTED_FULL) {
+            startupMode = CoreOptions.StartupMode.LATEST_FULL;
+        }
+        newOptions.put(CoreOptions.SCAN_MODE.key(), startupMode.toString());
+
+        TableSchema newSchema = fileStoreTable.schema().copy(newOptions);
+        return fileStoreTable.copy(newSchema);
+    }
+
     public RecordReader<InternalRow> nextBatch(boolean useParallelism, boolean 
readSequenceNumber)
             throws Exception {
         List<Split> splits = scan.plan().splits();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index c0e6bea9a..5fc9af74b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.params.provider.EnumSource;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -135,11 +136,20 @@ public class LookupJoinITCase extends CatalogITCaseBase {
 
     @ParameterizedTest
     @EnumSource(LookupCacheMode.class)
-    public void testLookupWithLatest(LookupCacheMode cacheMode) throws 
Exception {
+    public void testLookupIgnoreScanOptions(LookupCacheMode cacheMode) throws 
Exception {
         initTable(cacheMode);
         sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
+
+        String scanOption;
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            scanOption = "'scan.mode'='latest'";
+        } else {
+            scanOption = "'scan.snapshot-id'='2'";
+        }
         String query =
-                "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+ 
OPTIONS('scan.mode'='latest') */"
+                "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+ OPTIONS("
+                        + scanOption
+                        + ") */"
                         + " for system_time as of T.proctime AS D ON T.i = 
D.i";
         BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
 

Reply via email to