This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d3ce11811 [flink] Support count star push down to source for append
table (#4236)
d3ce11811 is described below
commit d3ce11811c67e5cbf70afee16884efe3fc2820df
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Sep 24 17:44:35 2024 +0800
[flink] Support count star push down to source for append table (#4236)
---
.../org/apache/paimon/manifest/PartitionEntry.java | 37 +-
.../paimon/table/DelegatedFileStoreTable.java | 4 +
.../paimon/table/FallbackReadFileStoreTable.java | 8 -
.../paimon/table/source/AbstractDataTableScan.java | 6 -
.../paimon/table/source/DataTableBatchScan.java | 9 +
.../paimon/table/source/DataTableStreamScan.java | 9 +
.../source/snapshot/AbstractStartingScanner.java | 14 +
.../source/snapshot/CompactedStartingScanner.java | 9 +-
.../ContinuousFromSnapshotFullStartingScanner.java | 9 +-
.../snapshot/FileCreationTimeStartingScanner.java | 18 +-
.../snapshot/FullCompactedStartingScanner.java | 9 +-
.../table/source/snapshot/FullStartingScanner.java | 9 +-
...ngScanner.java => ReadPlanStartingScanner.java} | 42 +-
.../table/source/snapshot/StartingScanner.java | 3 +
.../StaticFromSnapshotStartingScanner.java | 9 +-
.../snapshot/StaticFromTagStartingScanner.java | 7 +-
.../StaticFromTimestampStartingScanner.java | 9 +-
.../StaticFromWatermarkStartingScanner.java | 7 +-
.../apache/paimon/table/system/AuditLogTable.java | 60 +--
.../paimon/flink/source/DataTableSource.java | 12 +-
.../flink/lookup/DynamicPartitionLoader.java | 8 +-
.../paimon/flink/lookup/FullCacheLookupTable.java | 4 +-
.../paimon/flink/lookup/LookupDataTableScan.java | 4 +-
.../paimon/flink/lookup/LookupFileStoreTable.java | 26 --
.../paimon/flink/lookup/LookupStreamingReader.java | 5 +-
.../paimon/flink/source/BaseDataTableSource.java | 94 ++++-
.../paimon/flink/source/DataTableSource.java | 12 +-
.../paimon/flink/source/FlinkTableSource.java | 38 +-
.../flink/source/NumberSequenceRowSource.java | 436 +++++++++++++++++++++
.../apache/paimon/flink/BatchFileStoreITCase.java | 59 +++
.../paimon/flink/source/IteratorSourcesITCase.java | 98 +++++
.../flink/source/NumberSequenceRowSourceTest.java | 195 +++++++++
.../FileStoreTableStatisticsTestBase.java | 9 +-
.../statistics/PrimaryKeyTableStatisticsTest.java | 3 +-
.../apache/paimon/flink/util/AbstractTestBase.java | 44 +++
35 files changed, 1146 insertions(+), 179 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
index 22b7fc2fe..1aa562444 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
@@ -20,12 +20,15 @@ package org.apache.paimon.manifest;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.table.source.DataSplit;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import static org.apache.paimon.manifest.FileKind.ADD;
import static org.apache.paimon.manifest.FileKind.DELETE;
/** Entry representing a partition. */
@@ -81,20 +84,21 @@ public class PartitionEntry {
}
public static PartitionEntry fromManifestEntry(ManifestEntry entry) {
- long recordCount = entry.file().rowCount();
- long fileSizeInBytes = entry.file().fileSize();
+ return fromDataFile(entry.partition(), entry.kind(), entry.file());
+ }
+
+ public static PartitionEntry fromDataFile(
+ BinaryRow partition, FileKind kind, DataFileMeta file) {
+ long recordCount = file.rowCount();
+ long fileSizeInBytes = file.fileSize();
long fileCount = 1;
- if (entry.kind() == DELETE) {
+ if (kind == DELETE) {
recordCount = -recordCount;
fileSizeInBytes = -fileSizeInBytes;
fileCount = -fileCount;
}
return new PartitionEntry(
- entry.partition(),
- recordCount,
- fileSizeInBytes,
- fileCount,
- entry.file().creationTimeEpochMillis());
+ partition, recordCount, fileSizeInBytes, fileCount,
file.creationTimeEpochMillis());
}
public static Collection<PartitionEntry> merge(Collection<ManifestEntry>
fileEntries) {
@@ -108,6 +112,23 @@ public class PartitionEntry {
return partitions.values();
}
+ public static Collection<PartitionEntry> mergeSplits(Collection<DataSplit>
splits) {
+ Map<BinaryRow, PartitionEntry> partitions = new HashMap<>();
+ for (DataSplit split : splits) {
+ BinaryRow partition = split.partition();
+ for (DataFileMeta file : split.dataFiles()) {
+ PartitionEntry partitionEntry = fromDataFile(partition, ADD,
file);
+ partitions.compute(
+ partition,
+ (part, old) -> old == null ? partitionEntry :
old.merge(partitionEntry));
+ }
+
+ // Ignore before files, because we don't know how to merge them
+ // Ignore deletion files, because it is costly to read from it
+ }
+ return partitions.values();
+ }
+
public static void merge(Collection<PartitionEntry> from, Map<BinaryRow,
PartitionEntry> to) {
for (PartitionEntry entry : from) {
to.compute(entry.partition(), (part, old) -> old == null ? entry :
old.merge(entry));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index a60e17496..dc758a1af 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -58,6 +58,10 @@ public abstract class DelegatedFileStoreTable implements
FileStoreTable {
this.wrapped = wrapped;
}
+ public FileStoreTable wrapped() {
+ return wrapped;
+ }
+
@Override
public String name() {
return wrapped.name();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index f8be79cb4..7e0d4ab98 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -46,7 +46,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -304,13 +303,6 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
return new DataFilePlan(splits);
}
- @Override
- public List<BinaryRow> listPartitions() {
- Set<BinaryRow> partitions = new
LinkedHashSet<>(mainScan.listPartitions());
- partitions.addAll(fallbackScan.listPartitions());
- return new ArrayList<>(partitions);
- }
-
@Override
public List<PartitionEntry> listPartitionEntries() {
List<PartitionEntry> partitionEntries =
mainScan.listPartitionEntries();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index ba1bc6588..6a8aa9265 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -24,7 +24,6 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
@@ -240,9 +239,4 @@ public abstract class AbstractDataTableScan implements
DataTableScan {
"Unknown startup mode " + startupMode.name());
}
}
-
- @Override
- public List<PartitionEntry> listPartitionEntries() {
- return snapshotReader.partitionEntries();
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 8cf842aee..93ab5ba16 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
@@ -81,6 +82,14 @@ public class DataTableBatchScan extends
AbstractDataTableScan {
}
}
+ @Override
+ public List<PartitionEntry> listPartitionEntries() {
+ if (startingScanner == null) {
+ startingScanner = createStartingScanner(false);
+ }
+ return startingScanner.scanPartitions(snapshotReader);
+ }
+
private StartingScanner.Result applyPushDownLimit(StartingScanner.Result
result) {
if (pushDownLimit != null && result instanceof ScannedResult) {
long scannedRowCount = 0;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index 4cd221996..a68c7b1cb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.lookup.LookupStrategy;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
@@ -44,6 +45,8 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.util.List;
+
import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION;
/** {@link StreamTableScan} implementation for streaming planning. */
@@ -110,6 +113,12 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
}
}
+ @Override
+ public List<PartitionEntry> listPartitionEntries() {
+ throw new UnsupportedOperationException(
+ "List Partition Entries is not supported in Stream Scan.");
+ }
+
private void initScanner() {
if (startingScanner == null) {
startingScanner = createStartingScanner(true);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AbstractStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AbstractStartingScanner.java
index d711781d7..c1b2f985a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AbstractStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AbstractStartingScanner.java
@@ -18,9 +18,14 @@
package org.apache.paimon.table.source.snapshot;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.utils.SnapshotManager;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
/** The abstract class for StartingScanner. */
public abstract class AbstractStartingScanner implements StartingScanner {
@@ -44,4 +49,13 @@ public abstract class AbstractStartingScanner implements
StartingScanner {
return new StartingContext(startingSnapshotId, startingScanMode()
== ScanMode.ALL);
}
}
+
+ @Override
+ public List<PartitionEntry> scanPartitions(SnapshotReader snapshotReader) {
+ Result result = scan(snapshotReader);
+ if (result instanceof ScannedResult) {
+ return new ArrayList<>(PartitionEntry.mergeSplits(((ScannedResult)
result).splits()));
+ }
+ return Collections.emptyList();
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
index a15954627..353321a87 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
/** {@link StartingScanner} for the {@link
CoreOptions.StartupMode#COMPACTED_FULL} startup mode. */
-public class CompactedStartingScanner extends AbstractStartingScanner {
+public class CompactedStartingScanner extends ReadPlanStartingScanner {
private static final Logger LOG =
LoggerFactory.getLogger(CompactedStartingScanner.class);
@@ -44,13 +44,13 @@ public class CompactedStartingScanner extends
AbstractStartingScanner {
}
@Override
- public Result scan(SnapshotReader snapshotReader) {
+ public SnapshotReader configure(SnapshotReader snapshotReader) {
Long startingSnapshotId = pick();
if (startingSnapshotId == null) {
startingSnapshotId = snapshotManager.latestSnapshotId();
if (startingSnapshotId == null) {
LOG.debug("There is currently no snapshot. Wait for the
snapshot generation.");
- return new NoSnapshot();
+ return null;
} else {
LOG.debug(
"No compact snapshot found, reading from the latest
snapshot {}.",
@@ -58,8 +58,7 @@ public class CompactedStartingScanner extends
AbstractStartingScanner {
}
}
- return StartingScanner.fromPlan(
-
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
+ return
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
}
@Nullable
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
index 3ffd421d0..d7b4b1eeb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
@@ -26,7 +26,7 @@ import org.apache.paimon.utils.SnapshotManager;
* {@link StartingScanner} for the {@link
CoreOptions.StartupMode#FROM_SNAPSHOT_FULL} startup mode
* of a batch read.
*/
-public class ContinuousFromSnapshotFullStartingScanner extends
AbstractStartingScanner {
+public class ContinuousFromSnapshotFullStartingScanner extends
ReadPlanStartingScanner {
public ContinuousFromSnapshotFullStartingScanner(
SnapshotManager snapshotManager, long snapshotId) {
@@ -40,13 +40,12 @@ public class ContinuousFromSnapshotFullStartingScanner
extends AbstractStartingS
}
@Override
- public Result scan(SnapshotReader snapshotReader) {
+ public SnapshotReader configure(SnapshotReader snapshotReader) {
Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
if (earliestSnapshotId == null) {
- return new NoSnapshot();
+ return null;
}
long ceiledSnapshotId = Math.max(startingSnapshotId,
earliestSnapshotId);
- return StartingScanner.fromPlan(
-
snapshotReader.withMode(ScanMode.ALL).withSnapshot(ceiledSnapshotId).read());
+ return
snapshotReader.withMode(ScanMode.ALL).withSnapshot(ceiledSnapshotId);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FileCreationTimeStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FileCreationTimeStartingScanner.java
index 0ff55152a..ad557b22a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FileCreationTimeStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FileCreationTimeStartingScanner.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
* {@link StartingScanner} for the {@link
CoreOptions.StartupMode#FROM_FILE_CREATION_TIME} startup
* mode.
*/
-public class FileCreationTimeStartingScanner extends AbstractStartingScanner {
+public class FileCreationTimeStartingScanner extends ReadPlanStartingScanner {
private static final Logger LOG =
LoggerFactory.getLogger(FileCreationTimeStartingScanner.class);
@@ -47,18 +47,16 @@ public class FileCreationTimeStartingScanner extends
AbstractStartingScanner {
}
@Override
- public Result scan(SnapshotReader snapshotReader) {
+ public SnapshotReader configure(SnapshotReader snapshotReader) {
Long startingSnapshotId = snapshotManager.latestSnapshotId();
if (startingSnapshotId == null) {
LOG.debug("There is currently no snapshot. Waiting for snapshot
generation.");
- return new NoSnapshot();
+ return null;
}
- return StartingScanner.fromPlan(
- snapshotReader
- .withMode(ScanMode.ALL)
- .withSnapshot(startingSnapshotId)
- .withManifestEntryFilter(
- entry ->
entry.file().creationTimeEpochMillis() >= startupMillis)
- .read());
+ return snapshotReader
+ .withMode(ScanMode.ALL)
+ .withSnapshot(startingSnapshotId)
+ .withManifestEntryFilter(
+ entry -> entry.file().creationTimeEpochMillis() >=
startupMillis);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
index 5015d56c7..7fa334d8c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
@@ -33,7 +33,7 @@ import javax.annotation.Nullable;
* {@link StartingScanner} for the {@link StartupMode#COMPACTED_FULL} startup
mode with
* 'full-compaction.delta-commits'.
*/
-public class FullCompactedStartingScanner extends AbstractStartingScanner {
+public class FullCompactedStartingScanner extends ReadPlanStartingScanner {
private static final Logger LOG =
LoggerFactory.getLogger(FullCompactedStartingScanner.class);
@@ -62,13 +62,13 @@ public class FullCompactedStartingScanner extends
AbstractStartingScanner {
}
@Override
- public Result scan(SnapshotReader snapshotReader) {
+ public SnapshotReader configure(SnapshotReader snapshotReader) {
Long startingSnapshotId = pick();
if (startingSnapshotId == null) {
startingSnapshotId = snapshotManager.latestSnapshotId();
if (startingSnapshotId == null) {
LOG.debug("There is currently no snapshot. Wait for the
snapshot generation.");
- return new NoSnapshot();
+ return null;
} else {
LOG.debug(
"No compact snapshot found, reading from the latest
snapshot {}.",
@@ -76,8 +76,7 @@ public class FullCompactedStartingScanner extends
AbstractStartingScanner {
}
}
- return StartingScanner.fromPlan(
-
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
+ return
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
}
public static boolean isFullCompactedIdentifier(long identifier, int
deltaCommits) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
index fc9b49d2d..1189bc68e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
@@ -26,7 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** {@link StartingScanner} for the {@link
CoreOptions.StartupMode#LATEST_FULL} startup mode. */
-public class FullStartingScanner extends AbstractStartingScanner {
+public class FullStartingScanner extends ReadPlanStartingScanner {
private static final Logger LOG =
LoggerFactory.getLogger(FullStartingScanner.class);
@@ -41,16 +41,15 @@ public class FullStartingScanner extends
AbstractStartingScanner {
}
@Override
- public Result scan(SnapshotReader snapshotReader) {
+ public SnapshotReader configure(SnapshotReader snapshotReader) {
if (startingSnapshotId == null) {
// try to get first snapshot again
startingSnapshotId = snapshotManager.latestSnapshotId();
}
if (startingSnapshotId == null) {
LOG.debug("There is currently no snapshot. Waiting for snapshot
generation.");
- return new NoSnapshot();
+ return null;
}
- return StartingScanner.fromPlan(
-
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
+ return
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ReadPlanStartingScanner.java
similarity index 55%
copy from
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
copy to
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ReadPlanStartingScanner.java
index 3ffd421d0..56b7680eb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ReadPlanStartingScanner.java
@@ -18,35 +18,39 @@
package org.apache.paimon.table.source.snapshot;
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.utils.SnapshotManager;
-/**
- * {@link StartingScanner} for the {@link
CoreOptions.StartupMode#FROM_SNAPSHOT_FULL} startup mode
- * of a batch read.
- */
-public class ContinuousFromSnapshotFullStartingScanner extends
AbstractStartingScanner {
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+
+/** An {@link AbstractStartingScanner} to return plan. */
+public abstract class ReadPlanStartingScanner extends AbstractStartingScanner {
- public ContinuousFromSnapshotFullStartingScanner(
- SnapshotManager snapshotManager, long snapshotId) {
+ ReadPlanStartingScanner(SnapshotManager snapshotManager) {
super(snapshotManager);
- this.startingSnapshotId = snapshotId;
}
- @Override
- public ScanMode startingScanMode() {
- return ScanMode.ALL;
- }
+ @Nullable
+ protected abstract SnapshotReader configure(SnapshotReader snapshotReader);
@Override
public Result scan(SnapshotReader snapshotReader) {
- Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
- if (earliestSnapshotId == null) {
+ SnapshotReader configured = configure(snapshotReader);
+ if (configured == null) {
return new NoSnapshot();
}
- long ceiledSnapshotId = Math.max(startingSnapshotId,
earliestSnapshotId);
- return StartingScanner.fromPlan(
-
snapshotReader.withMode(ScanMode.ALL).withSnapshot(ceiledSnapshotId).read());
+ return StartingScanner.fromPlan(configured.read());
+ }
+
+ @Override
+ public List<PartitionEntry> scanPartitions(SnapshotReader snapshotReader) {
+ SnapshotReader configured = configure(snapshotReader);
+ if (configured == null) {
+ return Collections.emptyList();
+ }
+ return configured.partitionEntries();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
index 98dcaf669..6d9daa70a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.source.snapshot;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.TableScan;
@@ -32,6 +33,8 @@ public interface StartingScanner {
Result scan(SnapshotReader snapshotReader);
+ List<PartitionEntry> scanPartitions(SnapshotReader snapshotReader);
+
/** Scan result of {@link #scan}. */
interface Result {}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
index ca0f0a91a..19b9da244 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
@@ -31,7 +31,7 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
* {@link StartingScanner} for the {@link
CoreOptions.StartupMode#FROM_SNAPSHOT} or {@link
* CoreOptions.StartupMode#FROM_SNAPSHOT_FULL} startup mode of a batch read.
*/
-public class StaticFromSnapshotStartingScanner extends AbstractStartingScanner
{
+public class StaticFromSnapshotStartingScanner extends ReadPlanStartingScanner
{
private static final Logger LOG =
LoggerFactory.getLogger(StaticFromSnapshotStartingScanner.class);
@@ -47,13 +47,13 @@ public class StaticFromSnapshotStartingScanner extends
AbstractStartingScanner {
}
@Override
- public Result scan(SnapshotReader snapshotReader) {
+ public SnapshotReader configure(SnapshotReader snapshotReader) {
Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
Long latestSnapshotId = snapshotManager.latestSnapshotId();
if (earliestSnapshotId == null || latestSnapshotId == null) {
LOG.warn("There is currently no snapshot. Waiting for snapshot
generation.");
- return new NoSnapshot();
+ return null;
}
// Checks earlier whether the specified scan snapshot id is valid and
throws the correct
@@ -65,7 +65,6 @@ public class StaticFromSnapshotStartingScanner extends
AbstractStartingScanner {
earliestSnapshotId,
latestSnapshotId);
- return StartingScanner.fromPlan(
-
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
+ return
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
index 3850f41a8..4fa070299 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
@@ -25,7 +25,7 @@ import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
/** {@link StartingScanner} for the {@link CoreOptions#SCAN_TAG_NAME} of a
batch read. */
-public class StaticFromTagStartingScanner extends AbstractStartingScanner {
+public class StaticFromTagStartingScanner extends ReadPlanStartingScanner {
private final String tagName;
@@ -40,11 +40,10 @@ public class StaticFromTagStartingScanner extends
AbstractStartingScanner {
}
@Override
- public Result scan(SnapshotReader snapshotReader) {
+ public SnapshotReader configure(SnapshotReader snapshotReader) {
TagManager tagManager =
new TagManager(snapshotManager.fileIO(),
snapshotManager.tablePath());
Snapshot snapshot = tagManager.taggedSnapshot(tagName);
- return StartingScanner.fromPlan(
-
snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshot).read());
+ return snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshot);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
index 1c039c0c7..a5087890a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
@@ -32,7 +32,7 @@ import javax.annotation.Nullable;
* {@link StartingScanner} for the {@link
CoreOptions.StartupMode#FROM_TIMESTAMP} startup mode of a
* batch read.
*/
-public class StaticFromTimestampStartingScanner extends
AbstractStartingScanner {
+public class StaticFromTimestampStartingScanner extends
ReadPlanStartingScanner {
private static final Logger LOG =
LoggerFactory.getLogger(StaticFromTimestampStartingScanner.class);
@@ -49,15 +49,14 @@ public class StaticFromTimestampStartingScanner extends
AbstractStartingScanner
}
@Override
- public Result scan(SnapshotReader snapshotReader) {
+ public SnapshotReader configure(SnapshotReader snapshotReader) {
if (startingSnapshotId == null) {
LOG.debug(
"There is currently no snapshot earlier than or equal to
timestamp[{}]",
startupMillis);
- return new NoSnapshot();
+ return null;
}
- return StartingScanner.fromPlan(
-
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
+ return
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
}
@Nullable
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java
index 022fdf37f..72fba5533 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
/** {@link StartingScanner} for the {@link CoreOptions#SCAN_WATERMARK} of a
batch read. */
-public class StaticFromWatermarkStartingScanner extends
AbstractStartingScanner {
+public class StaticFromWatermarkStartingScanner extends
ReadPlanStartingScanner {
private static final Logger LOG =
LoggerFactory.getLogger(StaticFromWatermarkStartingScanner.class);
@@ -51,7 +51,7 @@ public class StaticFromWatermarkStartingScanner extends
AbstractStartingScanner
}
@Override
- public Result scan(SnapshotReader snapshotReader) {
+ public SnapshotReader configure(SnapshotReader snapshotReader) {
if (startingSnapshotId == null) {
LOG.warn(
"There is currently no snapshot later than or equal to
watermark[{}]",
@@ -62,8 +62,7 @@ public class StaticFromWatermarkStartingScanner extends
AbstractStartingScanner
+ "watermark[%d]",
watermark));
}
- return StartingScanner.fromPlan(
-
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
+ return
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
}
@Nullable
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index d9cf80289..84c5fbd42 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -237,155 +237,155 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
private class AuditLogDataReader implements SnapshotReader {
- private final SnapshotReader snapshotReader;
+ private final SnapshotReader wrapped;
- private AuditLogDataReader(SnapshotReader snapshotReader) {
- this.snapshotReader = snapshotReader;
+ private AuditLogDataReader(SnapshotReader wrapped) {
+ this.wrapped = wrapped;
}
@Override
public Integer parallelism() {
- return snapshotReader.parallelism();
+ return wrapped.parallelism();
}
@Override
public SnapshotManager snapshotManager() {
- return snapshotReader.snapshotManager();
+ return wrapped.snapshotManager();
}
@Override
public ManifestsReader manifestsReader() {
- return snapshotReader.manifestsReader();
+ return wrapped.manifestsReader();
}
@Override
public List<ManifestEntry> readManifest(ManifestFileMeta manifest) {
- return snapshotReader.readManifest(manifest);
+ return wrapped.readManifest(manifest);
}
@Override
public ConsumerManager consumerManager() {
- return snapshotReader.consumerManager();
+ return wrapped.consumerManager();
}
@Override
public SplitGenerator splitGenerator() {
- return snapshotReader.splitGenerator();
+ return wrapped.splitGenerator();
}
@Override
public FileStorePathFactory pathFactory() {
- return snapshotReader.pathFactory();
+ return wrapped.pathFactory();
}
public SnapshotReader withSnapshot(long snapshotId) {
- snapshotReader.withSnapshot(snapshotId);
+ wrapped.withSnapshot(snapshotId);
return this;
}
public SnapshotReader withSnapshot(Snapshot snapshot) {
- snapshotReader.withSnapshot(snapshot);
+ wrapped.withSnapshot(snapshot);
return this;
}
public SnapshotReader withFilter(Predicate predicate) {
- convert(predicate).ifPresent(snapshotReader::withFilter);
+ convert(predicate).ifPresent(wrapped::withFilter);
return this;
}
@Override
public SnapshotReader withPartitionFilter(Map<String, String>
partitionSpec) {
- snapshotReader.withPartitionFilter(partitionSpec);
+ wrapped.withPartitionFilter(partitionSpec);
return this;
}
@Override
public SnapshotReader withPartitionFilter(Predicate predicate) {
- snapshotReader.withPartitionFilter(predicate);
+ wrapped.withPartitionFilter(predicate);
return this;
}
@Override
public SnapshotReader withPartitionFilter(List<BinaryRow> partitions) {
- snapshotReader.withPartitionFilter(partitions);
+ wrapped.withPartitionFilter(partitions);
return this;
}
@Override
public SnapshotReader withMode(ScanMode scanMode) {
- snapshotReader.withMode(scanMode);
+ wrapped.withMode(scanMode);
return this;
}
@Override
public SnapshotReader withLevelFilter(Filter<Integer> levelFilter) {
- snapshotReader.withLevelFilter(levelFilter);
+ wrapped.withLevelFilter(levelFilter);
return this;
}
@Override
public SnapshotReader withManifestEntryFilter(Filter<ManifestEntry>
filter) {
- snapshotReader.withManifestEntryFilter(filter);
+ wrapped.withManifestEntryFilter(filter);
return this;
}
public SnapshotReader withBucket(int bucket) {
- snapshotReader.withBucket(bucket);
+ wrapped.withBucket(bucket);
return this;
}
@Override
public SnapshotReader withBucketFilter(Filter<Integer> bucketFilter) {
- snapshotReader.withBucketFilter(bucketFilter);
+ wrapped.withBucketFilter(bucketFilter);
return this;
}
@Override
public SnapshotReader withDataFileNameFilter(Filter<String>
fileNameFilter) {
- snapshotReader.withDataFileNameFilter(fileNameFilter);
+ wrapped.withDataFileNameFilter(fileNameFilter);
return this;
}
@Override
public SnapshotReader withShard(int indexOfThisSubtask, int
numberOfParallelSubtasks) {
- snapshotReader.withShard(indexOfThisSubtask,
numberOfParallelSubtasks);
+ wrapped.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
return this;
}
@Override
public SnapshotReader withMetricRegistry(MetricRegistry registry) {
- snapshotReader.withMetricRegistry(registry);
+ wrapped.withMetricRegistry(registry);
return this;
}
@Override
public Plan read() {
- return snapshotReader.read();
+ return wrapped.read();
}
@Override
public Plan readChanges() {
- return snapshotReader.readChanges();
+ return wrapped.readChanges();
}
@Override
public Plan readIncrementalDiff(Snapshot before) {
- return snapshotReader.readIncrementalDiff(before);
+ return wrapped.readIncrementalDiff(before);
}
@Override
public List<BinaryRow> partitions() {
- return snapshotReader.partitions();
+ return wrapped.partitions();
}
@Override
public List<PartitionEntry> partitionEntries() {
- return snapshotReader.partitionEntries();
+ return wrapped.partitionEntries();
}
@Override
public List<BucketEntry> bucketEntries() {
- return snapshotReader.bucketEntries();
+ return wrapped.bucketEntries();
}
}
diff --git
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index f870d8370..ee00d4183 100644
---
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -50,7 +50,8 @@ public class DataTableSource extends BaseDataTableSource {
null,
null,
null,
- null);
+ null,
+ false);
}
public DataTableSource(
@@ -62,7 +63,8 @@ public class DataTableSource extends BaseDataTableSource {
@Nullable Predicate predicate,
@Nullable int[][] projectFields,
@Nullable Long limit,
- @Nullable WatermarkStrategy<RowData> watermarkStrategy) {
+ @Nullable WatermarkStrategy<RowData> watermarkStrategy,
+ boolean isBatchCountStar) {
super(
tableIdentifier,
table,
@@ -72,7 +74,8 @@ public class DataTableSource extends BaseDataTableSource {
predicate,
projectFields,
limit,
- watermarkStrategy);
+ watermarkStrategy,
+ isBatchCountStar);
}
@Override
@@ -86,7 +89,8 @@ public class DataTableSource extends BaseDataTableSource {
predicate,
projectFields,
limit,
- watermarkStrategy);
+ watermarkStrategy,
+ isBatchCountStar);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
index a8660ee8c..37a504c58 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
@@ -24,7 +24,6 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
@@ -49,7 +48,6 @@ public class DynamicPartitionLoader implements Serializable {
private final Table table;
private final Duration refreshInterval;
- private TableScan scan;
private Comparator<InternalRow> comparator;
private LocalDateTime lastRefresh;
@@ -61,7 +59,6 @@ public class DynamicPartitionLoader implements Serializable {
}
public void open() {
- this.scan = table.newReadBuilder().newScan();
RowType partitionType = table.rowType().project(table.partitionKeys());
this.comparator =
CodeGenUtils.newRecordComparator(partitionType.getFieldTypes());
}
@@ -87,7 +84,10 @@ public class DynamicPartitionLoader implements Serializable {
}
BinaryRow previous = this.partition;
- partition =
scan.listPartitions().stream().max(comparator).orElse(null);
+ partition =
+ table.newReadBuilder().newScan().listPartitions().stream()
+ .max(comparator)
+ .orElse(null);
lastRefresh = LocalDateTime.now();
return !Objects.equals(previous, partition);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index 15b82fbe4..e9389f1f2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -334,7 +334,7 @@ public abstract class FullCacheLookupTable implements
LookupTable {
/** Context for {@link LookupTable}. */
public static class Context {
- public final FileStoreTable table;
+ public final LookupFileStoreTable table;
public final int[] projection;
@Nullable public final Predicate tablePredicate;
@Nullable public final Predicate projectedPredicate;
@@ -361,7 +361,7 @@ public abstract class FullCacheLookupTable implements
LookupTable {
public Context copy(int[] newProjection) {
return new Context(
- table,
+ table.wrapped(),
newProjection,
tablePredicate,
projectedPredicate,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
index fe064b80e..908884a57 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
@@ -38,8 +38,8 @@ import static
org.apache.paimon.flink.lookup.LookupFileStoreTable.LookupStreamSc
*/
public class LookupDataTableScan extends DataTableStreamScan {
- private StartupMode startupMode;
- private LookupStreamScanMode lookupScanMode;
+ private final StartupMode startupMode;
+ private final LookupStreamScanMode lookupScanMode;
public LookupDataTableScan(
CoreOptions options,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java
index bb5274bc6..090399706 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java
@@ -22,9 +22,6 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.utils.TableScanUtils;
-import org.apache.paimon.manifest.IndexManifestEntry;
-import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.options.Options;
import org.apache.paimon.options.description.DescribedEnum;
@@ -33,10 +30,7 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.DelegatedFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.InnerTableRead;
-import org.apache.paimon.table.source.ReadBuilder;
-import org.apache.paimon.table.source.ReadBuilderImpl;
import org.apache.paimon.table.source.StreamDataTableScan;
-import org.apache.paimon.utils.SimpleFileReader;
import java.util.HashSet;
import java.util.List;
@@ -63,11 +57,6 @@ public class LookupFileStoreTable extends
DelegatedFileStoreTable {
this.lookupScanMode = lookupScanMode;
}
- @Override
- public ReadBuilder newReadBuilder() {
- return new ReadBuilderImpl(this);
- }
-
@Override
public InnerTableRead newRead() {
switch (lookupScanMode) {
@@ -94,21 +83,6 @@ public class LookupFileStoreTable extends
DelegatedFileStoreTable {
lookupScanMode);
}
- @Override
- public SimpleFileReader<ManifestFileMeta> manifestListReader() {
- return wrapped.manifestListReader();
- }
-
- @Override
- public SimpleFileReader<ManifestEntry> manifestFileReader() {
- return wrapped.manifestFileReader();
- }
-
- @Override
- public SimpleFileReader<IndexManifestEntry> indexManifestFileReader() {
- return wrapped.indexManifestFileReader();
- }
-
@Override
public FileStoreTable copy(Map<String, String> dynamicOptions) {
return new LookupFileStoreTable(wrapped.copy(dynamicOptions),
lookupScanMode);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
index fa9b7672d..e6dfd41f8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
@@ -26,7 +26,6 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
@@ -50,14 +49,14 @@ import static
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping
/** A streaming reader to load data into {@link LookupTable}. */
public class LookupStreamingReader {
- private final Table table;
+ private final LookupFileStoreTable table;
private final int[] projection;
private final ReadBuilder readBuilder;
@Nullable private final Predicate projectedPredicate;
private final StreamTableScan scan;
public LookupStreamingReader(
- Table table,
+ LookupFileStoreTable table,
int[] projection,
@Nullable Predicate predicate,
Set<Integer> requireCachedBucketIds) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
index 8775ab8f5..9458f7817 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
@@ -28,12 +28,16 @@ import org.apache.paimon.flink.log.LogSourceProvider;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.flink.lookup.FileStoreLookupFunction;
import org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.utils.Projection;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Source;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.LookupTableSource;
@@ -41,19 +45,25 @@ import
org.apache.flink.table.connector.source.LookupTableSource.LookupContext;
import
org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider;
import org.apache.flink.table.connector.source.ScanTableSource.ScanContext;
import
org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
+import org.apache.flink.table.connector.source.SourceProvider;
+import
org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
import
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.AggregateExpression;
import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.types.DataType;
import javax.annotation.Nullable;
import java.time.Duration;
import java.util.List;
+import java.util.Optional;
import java.util.stream.IntStream;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
+import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC;
import static
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC_THREAD_NUMBER;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_REMOVE_NORMALIZE;
@@ -68,7 +78,7 @@ import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_
* batch mode or streaming mode.
*/
public abstract class BaseDataTableSource extends FlinkTableSource
- implements LookupTableSource, SupportsWatermarkPushDown {
+ implements LookupTableSource, SupportsWatermarkPushDown,
SupportsAggregatePushDown {
protected final ObjectIdentifier tableIdentifier;
protected final boolean streaming;
@@ -76,6 +86,7 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
@Nullable protected final LogStoreTableFactory logStoreTableFactory;
@Nullable protected WatermarkStrategy<RowData> watermarkStrategy;
+ protected boolean isBatchCountStar;
public BaseDataTableSource(
ObjectIdentifier tableIdentifier,
@@ -86,7 +97,8 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
@Nullable Predicate predicate,
@Nullable int[][] projectFields,
@Nullable Long limit,
- @Nullable WatermarkStrategy<RowData> watermarkStrategy) {
+ @Nullable WatermarkStrategy<RowData> watermarkStrategy,
+ boolean isBatchCountStar) {
super(table, predicate, projectFields, limit);
this.tableIdentifier = tableIdentifier;
this.streaming = streaming;
@@ -96,6 +108,7 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
this.projectFields = projectFields;
this.limit = limit;
this.watermarkStrategy = watermarkStrategy;
+ this.isBatchCountStar = isBatchCountStar;
}
@Override
@@ -110,7 +123,7 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
} else {
Options options = Options.fromMap(table.options());
- if (new CoreOptions(options).mergeEngine() ==
CoreOptions.MergeEngine.FIRST_ROW) {
+ if (new CoreOptions(options).mergeEngine() == FIRST_ROW) {
return ChangelogMode.insertOnly();
}
@@ -134,6 +147,10 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext)
{
+ if (isBatchCountStar) {
+ return createCountStarScan();
+ }
+
LogSourceProvider logSourceProvider = null;
if (logStoreTableFactory != null) {
logSourceProvider =
@@ -182,6 +199,29 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
.build());
}
+ private ScanRuntimeProvider createCountStarScan() {
+ TableScan scan =
table.newReadBuilder().withFilter(predicate).newScan();
+ List<PartitionEntry> partitionEntries = scan.listPartitionEntries();
+ long rowCount =
partitionEntries.stream().mapToLong(PartitionEntry::recordCount).sum();
+ NumberSequenceRowSource source = new NumberSequenceRowSource(rowCount,
rowCount);
+ return new SourceProvider() {
+ @Override
+ public Source<RowData, ?, ?> createSource() {
+ return source;
+ }
+
+ @Override
+ public boolean isBounded() {
+ return true;
+ }
+
+ @Override
+ public Optional<Integer> getParallelism() {
+ return Optional.of(1);
+ }
+ };
+ }
+
protected abstract List<String> dynamicPartitionFilteringFields();
@Override
@@ -209,6 +249,54 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
asyncThreadNumber);
}
+ @Override
+ public boolean applyAggregates(
+ List<int[]> groupingSets,
+ List<AggregateExpression> aggregateExpressions,
+ DataType producedDataType) {
+ if (isStreaming()) {
+ return false;
+ }
+
+ if (!(table instanceof DataTable)) {
+ return false;
+ }
+
+ if (!table.primaryKeys().isEmpty()) {
+ return false;
+ }
+
+ CoreOptions options = ((DataTable) table).coreOptions();
+ if (options.deletionVectorsEnabled()) {
+ return false;
+ }
+
+ if (groupingSets.size() != 1) {
+ return false;
+ }
+
+ if (groupingSets.get(0).length != 0) {
+ return false;
+ }
+
+ if (aggregateExpressions.size() != 1) {
+ return false;
+ }
+
+ if (!aggregateExpressions
+ .get(0)
+ .getFunctionDefinition()
+ .getClass()
+ .getName()
+ .equals(
+
"org.apache.flink.table.planner.functions.aggfunctions.Count1AggFunction")) {
+ return false;
+ }
+
+ isBatchCountStar = true;
+ return true;
+ }
+
@Override
public String asSummaryString() {
return "Paimon-DataSource";
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index 200550c88..ad5123205 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -62,7 +62,8 @@ public class DataTableSource extends BaseDataTableSource
null,
null,
null,
- null);
+ null,
+ false);
}
public DataTableSource(
@@ -75,7 +76,8 @@ public class DataTableSource extends BaseDataTableSource
@Nullable int[][] projectFields,
@Nullable Long limit,
@Nullable WatermarkStrategy<RowData> watermarkStrategy,
- @Nullable List<String> dynamicPartitionFilteringFields) {
+ @Nullable List<String> dynamicPartitionFilteringFields,
+ boolean isBatchCountStar) {
super(
tableIdentifier,
table,
@@ -85,7 +87,8 @@ public class DataTableSource extends BaseDataTableSource
predicate,
projectFields,
limit,
- watermarkStrategy);
+ watermarkStrategy,
+ isBatchCountStar);
this.dynamicPartitionFilteringFields = dynamicPartitionFilteringFields;
}
@@ -101,7 +104,8 @@ public class DataTableSource extends BaseDataTableSource
projectFields,
limit,
watermarkStrategy,
- dynamicPartitionFilteringFields);
+ dynamicPartitionFilteringFields,
+ isBatchCountStar);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index 920a1ba14..2be0248f3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -22,11 +22,13 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.PredicateConverter;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.PartitionPredicateVisitor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.PredicateVisitor;
+import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
@@ -94,7 +96,8 @@ public abstract class FlinkTableSource
List<ResolvedExpression> unConsumedFilters = new ArrayList<>();
List<ResolvedExpression> consumedFilters = new ArrayList<>();
List<Predicate> converted = new ArrayList<>();
- PredicateVisitor<Boolean> visitor = new
PartitionPredicateVisitor(partitionKeys);
+ PredicateVisitor<Boolean> onlyPartFieldsVisitor =
+ new PartitionPredicateVisitor(partitionKeys);
for (ResolvedExpression filter : filters) {
Optional<Predicate> predicateOptional =
PredicateConverter.convert(rowType, filter);
@@ -103,7 +106,7 @@ public abstract class FlinkTableSource
unConsumedFilters.add(filter);
} else {
Predicate p = predicateOptional.get();
- if (isStreaming() || !p.visit(visitor)) {
+ if (isStreaming() || !p.visit(onlyPartFieldsVisitor)) {
unConsumedFilters.add(filter);
} else {
consumedFilters.add(filter);
@@ -168,9 +171,28 @@ public abstract class FlinkTableSource
protected void scanSplitsForInference() {
if (splitStatistics == null) {
- List<Split> splits =
-
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
- splitStatistics = new SplitStatistics(splits);
+ if (table instanceof DataTable) {
+ List<PartitionEntry> partitionEntries =
+ table.newReadBuilder()
+ .withFilter(predicate)
+ .newScan()
+ .listPartitionEntries();
+ long totalSize = 0;
+ long rowCount = 0;
+ for (PartitionEntry entry : partitionEntries) {
+ totalSize += entry.fileSizeInBytes();
+ rowCount += entry.recordCount();
+ }
+ long splitTargetSize = ((DataTable)
table).coreOptions().splitTargetSize();
+ splitStatistics =
+ new SplitStatistics((int) (totalSize / splitTargetSize
+ 1), rowCount);
+ } else {
+ List<Split> splits =
+
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
+ splitStatistics =
+ new SplitStatistics(
+ splits.size(),
splits.stream().mapToLong(Split::rowCount).sum());
+ }
}
}
@@ -180,9 +202,9 @@ public abstract class FlinkTableSource
private final int splitNumber;
private final long totalRowCount;
- protected SplitStatistics(List<Split> splits) {
- this.splitNumber = splits.size();
- this.totalRowCount =
splits.stream().mapToLong(Split::rowCount).sum();
+ protected SplitStatistics(int splitNumber, long totalRowCount) {
+ this.splitNumber = splitNumber;
+ this.totalRowCount = totalRowCount;
}
public int splitNumber() {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NumberSequenceRowSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NumberSequenceRowSource.java
new file mode 100644
index 000000000..e467e37a8
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NumberSequenceRowSource.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.NumberSequenceIterator;
+import org.apache.flink.util.SplittableIterator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A data source that produces a sequence of numbers (longs) to {@link
RowData}. */
+public class NumberSequenceRowSource
+ implements Source<
+ RowData,
+ NumberSequenceRowSource.NumberSequenceSplit,
+
Collection<NumberSequenceRowSource.NumberSequenceSplit>>,
+ ResultTypeQueryable<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ /** The starting number in the sequence, inclusive. */
+ private final long from;
+
+ /** The end number in the sequence, inclusive. */
+ private final long to;
+
+ /**
+ * Creates a new {@code NumberSequenceSource} that produces parallel
sequences covering the
+ * range {@code from} to {@code to} (both boundaries are inclusive).
+ */
+ public NumberSequenceRowSource(long from, long to) {
+ checkArgument(from <= to, "'from' must be <= 'to'");
+ this.from = from;
+ this.to = to;
+ }
+
+ public long getFrom() {
+ return from;
+ }
+
+ public long getTo() {
+ return to;
+ }
+
+ // ------------------------------------------------------------------------
+ // source methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return InternalTypeInfo.of(RowType.of(new BigIntType(false)));
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SourceReader<RowData, NumberSequenceSplit> createReader(
+ SourceReaderContext readerContext) {
+ return new IteratorSourceReader<>(readerContext);
+ }
+
+ @Override
+ public SplitEnumerator<NumberSequenceSplit,
Collection<NumberSequenceSplit>> createEnumerator(
+ final SplitEnumeratorContext<NumberSequenceSplit> enumContext) {
+
+ final List<NumberSequenceSplit> splits =
+ splitNumberRange(from, to, enumContext.currentParallelism());
+ return new IteratorSourceEnumerator<>(enumContext, splits);
+ }
+
+ @Override
+ public SplitEnumerator<NumberSequenceSplit,
Collection<NumberSequenceSplit>> restoreEnumerator(
+ final SplitEnumeratorContext<NumberSequenceSplit> enumContext,
+ Collection<NumberSequenceSplit> checkpoint) {
+ return new IteratorSourceEnumerator<>(enumContext, checkpoint);
+ }
+
+ @Override
+ public SimpleVersionedSerializer<NumberSequenceSplit> getSplitSerializer()
{
+ return new SplitSerializer();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<Collection<NumberSequenceSplit>>
+ getEnumeratorCheckpointSerializer() {
+ return new CheckpointSerializer();
+ }
+
+ protected List<NumberSequenceSplit> splitNumberRange(long from, long to,
int numSplits) {
+ final NumberSequenceIterator[] subSequences =
+ new NumberSequenceIterator(from, to).split(numSplits);
+ final ArrayList<NumberSequenceSplit> splits = new
ArrayList<>(subSequences.length);
+
+ int splitId = 1;
+ for (NumberSequenceIterator seq : subSequences) {
+ if (seq.hasNext()) {
+ splits.add(
+ new NumberSequenceSplit(
+ String.valueOf(splitId++), seq.getCurrent(),
seq.getTo()));
+ }
+ }
+
+ return splits;
+ }
+
+ // ------------------------------------------------------------------------
+ // splits & checkpoint
+ // ------------------------------------------------------------------------
+
+ /** A split of the source, representing a number sub-sequence. */
+ public static class NumberSequenceSplit
+ implements IteratorSourceSplit<
+ RowData, NumberSequenceRowSource.NumberSequenceIterator> {
+
+ private final String splitId;
+ private final long from;
+ private final long to;
+
+ public NumberSequenceSplit(String splitId, long from, long to) {
+ checkArgument(from <= to, "'from' must be <= 'to'");
+ this.splitId = checkNotNull(splitId);
+ this.from = from;
+ this.to = to;
+ }
+
+ @Override
+ public String splitId() {
+ return splitId;
+ }
+
+ public long from() {
+ return from;
+ }
+
+ public long to() {
+ return to;
+ }
+
+ @SuppressWarnings("ClassEscapesDefinedScope")
+ @Override
+ public NumberSequenceRowSource.NumberSequenceIterator getIterator() {
+ return new NumberSequenceRowSource.NumberSequenceIterator(from,
to);
+ }
+
+ @SuppressWarnings("ClassEscapesDefinedScope")
+ @Override
+ public IteratorSourceSplit<RowData,
NumberSequenceRowSource.NumberSequenceIterator>
+ getUpdatedSplitForIterator(
+ final NumberSequenceRowSource.NumberSequenceIterator
iterator) {
+ return new NumberSequenceSplit(splitId, iterator.getCurrent(),
iterator.getTo());
+ }
+
+ @Override
+ public String toString() {
+ return String.format("NumberSequenceSplit [%d, %d] (%s)", from,
to, splitId);
+ }
+ }
+
+ private static final class SplitSerializer
+ implements SimpleVersionedSerializer<NumberSequenceSplit> {
+
+ private static final int CURRENT_VERSION = 1;
+
+ @Override
+ public int getVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public byte[] serialize(NumberSequenceSplit split) throws IOException {
+ checkArgument(
+ split.getClass() == NumberSequenceSplit.class, "cannot
serialize subclasses");
+
+ // We will serialize 2 longs (16 bytes) plus the UFT
representation of the string (2 +
+ // length)
+ final DataOutputSerializer out =
+ new DataOutputSerializer(split.splitId().length() + 18);
+ serializeV1(out, split);
+ return out.getCopyOfBuffer();
+ }
+
+ @Override
+ public NumberSequenceSplit deserialize(int version, byte[] serialized)
throws IOException {
+ if (version != CURRENT_VERSION) {
+ throw new IOException("Unrecognized version: " + version);
+ }
+ final DataInputDeserializer in = new
DataInputDeserializer(serialized);
+ return deserializeV1(in);
+ }
+
+ static void serializeV1(DataOutputView out, NumberSequenceSplit split)
throws IOException {
+ out.writeUTF(split.splitId());
+ out.writeLong(split.from());
+ out.writeLong(split.to());
+ }
+
+ static NumberSequenceSplit deserializeV1(DataInputView in) throws
IOException {
+ return new NumberSequenceSplit(in.readUTF(), in.readLong(),
in.readLong());
+ }
+ }
+
+ private static final class CheckpointSerializer
+ implements
SimpleVersionedSerializer<Collection<NumberSequenceSplit>> {
+
+ private static final int CURRENT_VERSION = 1;
+
+ @Override
+ public int getVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public byte[] serialize(Collection<NumberSequenceSplit> checkpoint)
throws IOException {
+ // Each split needs 2 longs (16 bytes) plus the UFT representation
of the string (2 +
+ // length)
+ // Assuming at most 4 digit split IDs, 22 bytes per split avoids
any intermediate array
+ // resizing.
+ // plus four bytes for the length field
+ final DataOutputSerializer out = new
DataOutputSerializer(checkpoint.size() * 22 + 4);
+ out.writeInt(checkpoint.size());
+ for (NumberSequenceSplit split : checkpoint) {
+ SplitSerializer.serializeV1(out, split);
+ }
+ return out.getCopyOfBuffer();
+ }
+
+ @Override
+ public Collection<NumberSequenceSplit> deserialize(int version, byte[]
serialized)
+ throws IOException {
+ if (version != CURRENT_VERSION) {
+ throw new IOException("Unrecognized version: " + version);
+ }
+ final DataInputDeserializer in = new
DataInputDeserializer(serialized);
+ final int num = in.readInt();
+ final ArrayList<NumberSequenceSplit> result = new ArrayList<>(num);
+ for (int remaining = num; remaining > 0; remaining--) {
+ result.add(SplitSerializer.deserializeV1(in));
+ }
+ return result;
+ }
+ }
+
+ private static class NumberSequenceIterator extends
SplittableIterator<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ /** The last number returned by the iterator. */
+ private final long to;
+
+ /** The next number to be returned. */
+ private long current;
+
+ /**
+ * Creates a new splittable iterator, returning the range [from, to].
Both boundaries of the
+ * interval are inclusive.
+ *
+ * @param from The first number returned by the iterator.
+ * @param to The last number returned by the iterator.
+ */
+ public NumberSequenceIterator(long from, long to) {
+ if (from > to) {
+ throw new IllegalArgumentException(
+ "The 'to' value must not be smaller than the 'from'
value.");
+ }
+
+ this.current = from;
+ this.to = to;
+ }
+
+ /**
+ * Internal constructor to allow for empty iterators.
+ *
+ * @param from The first number returned by the iterator.
+ * @param to The last number returned by the iterator.
+ * @param unused A dummy parameter to disambiguate the constructor.
+ */
+ @SuppressWarnings("unused")
+ private NumberSequenceIterator(long from, long to, boolean unused) {
+ this.current = from;
+ this.to = to;
+ }
+
+ public long getCurrent() {
+ return this.current;
+ }
+
+ public long getTo() {
+ return this.to;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return current <= to;
+ }
+
+ @Override
+ public RowData next() {
+ if (current <= to) {
+ return GenericRowData.of(current++);
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public NumberSequenceIterator[] split(int numPartitions) {
+ if (numPartitions < 1) {
+ throw new IllegalArgumentException("The number of partitions
must be at least 1.");
+ }
+
+ if (numPartitions == 1) {
+ return new NumberSequenceIterator[] {new
NumberSequenceIterator(current, to)};
+ }
+
+ // here, numPartitions >= 2 !!!
+
+ long elementsPerSplit;
+
+ if (to - current + 1 >= 0) {
+ elementsPerSplit = (to - current + 1) / numPartitions;
+ } else {
+ // long overflow of the range.
+ // we compute based on half the distance, to prevent the
overflow.
+ // in most cases it holds that: current < 0 and to > 0, except
for: to == 0 and
+ // current
+ // == Long.MIN_VALUE
+ // the later needs a special case
+ final long halfDiff; // must be positive
+
+ if (current == Long.MIN_VALUE) {
+ // this means to >= 0
+ halfDiff = (Long.MAX_VALUE / 2 + 1) + to / 2;
+ } else {
+ long posFrom = -current;
+ if (posFrom > to) {
+ halfDiff = to + ((posFrom - to) / 2);
+ } else {
+ halfDiff = posFrom + ((to - posFrom) / 2);
+ }
+ }
+ elementsPerSplit = halfDiff / numPartitions * 2;
+ }
+
+ // figure out how many get one in addition
+ long numWithExtra = -(elementsPerSplit * numPartitions) + to -
current + 1;
+
+ // based on rounding errors, we may have lost one
+ if (numWithExtra > numPartitions) {
+ elementsPerSplit++;
+ numWithExtra -= numPartitions;
+
+ if (numWithExtra > numPartitions) {
+ throw new RuntimeException("Bug in splitting logic. Too
much rounding loss.");
+ }
+ }
+
+ NumberSequenceIterator[] iters = new
NumberSequenceIterator[numPartitions];
+ long curr = current;
+ int i = 0;
+ for (; i < numWithExtra; i++) {
+ long next = curr + elementsPerSplit + 1;
+ iters[i] = new NumberSequenceIterator(curr, next - 1);
+ curr = next;
+ }
+ for (; i < numPartitions; i++) {
+ long next = curr + elementsPerSplit;
+ iters[i] = new NumberSequenceIterator(curr, next - 1, true);
+ curr = next;
+ }
+
+ return iters;
+ }
+
+ @Override
+ public int getMaximumNumberOfSplits() {
+ if (to >= Integer.MAX_VALUE
+ || current <= Integer.MIN_VALUE
+ || to - current + 1 >= Integer.MAX_VALUE) {
+ return Integer.MAX_VALUE;
+ } else {
+ return (int) (to - current + 1);
+ }
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index c60443a12..f03a16368 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -19,10 +19,12 @@
package org.apache.paimon.flink;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.flink.api.dag.Transformation;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
@@ -528,4 +530,61 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
DateTimeUtils.toInternal(timestamp,
0), 0)))
.containsExactlyInAnyOrder(Row.of(1, "a"), Row.of(2, "b"));
}
+
+ @Test
+ public void testCountStarAppend() {
+ sql("CREATE TABLE count_append (f0 INT, f1 STRING)");
+ sql("INSERT INTO count_append VALUES (1, 'a'), (2, 'b')");
+
+ String sql = "SELECT COUNT(*) FROM count_append";
+ assertThat(sql(sql)).containsOnly(Row.of(2L));
+ validateCount1PushDown(sql);
+ }
+
+ @Test
+ public void testCountStarPartAppend() {
+ sql("CREATE TABLE count_part_append (f0 INT, f1 STRING, dt STRING)
PARTITIONED BY (dt)");
+ sql("INSERT INTO count_part_append VALUES (1, 'a', '1'), (1, 'a',
'1'), (2, 'b', '2')");
+ String sql = "SELECT COUNT(*) FROM count_part_append WHERE dt = '1'";
+
+ assertThat(sql(sql)).containsOnly(Row.of(2L));
+ validateCount1PushDown(sql);
+ }
+
+ @Test
+ public void testCountStarAppendWithDv() {
+ sql(
+ "CREATE TABLE count_append_dv (f0 INT, f1 STRING) WITH
('deletion-vectors.enabled' = 'true')");
+ sql("INSERT INTO count_append_dv VALUES (1, 'a'), (2, 'b')");
+
+ String sql = "SELECT COUNT(*) FROM count_append_dv";
+ assertThat(sql(sql)).containsOnly(Row.of(2L));
+ validateCount1NotPushDown(sql);
+ }
+
+ @Test
+ public void testCountStarPK() {
+ sql("CREATE TABLE count_pk (f0 INT PRIMARY KEY NOT ENFORCED, f1
STRING)");
+ sql("INSERT INTO count_pk VALUES (1, 'a'), (2, 'b')");
+
+ String sql = "SELECT COUNT(*) FROM count_pk";
+ assertThat(sql(sql)).containsOnly(Row.of(2L));
+ validateCount1NotPushDown(sql);
+ }
+
+ private void validateCount1PushDown(String sql) {
+ Transformation<?> transformation = AbstractTestBase.translate(tEnv,
sql);
+ while (!transformation.getInputs().isEmpty()) {
+ transformation = transformation.getInputs().get(0);
+ }
+
assertThat(transformation.getDescription()).contains("Count1AggFunction");
+ }
+
+ private void validateCount1NotPushDown(String sql) {
+ Transformation<?> transformation = AbstractTestBase.translate(tEnv,
sql);
+ while (!transformation.getInputs().isEmpty()) {
+ transformation = transformation.getInputs().get(0);
+ }
+
assertThat(transformation.getDescription()).doesNotContain("Count1AggFunction");
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/IteratorSourcesITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/IteratorSourcesITCase.java
new file mode 100644
index 000000000..8404d994f
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/IteratorSourcesITCase.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.fail;
+
+/**
+ * An integration test for the sources based on iterators.
+ *
+ * <p>This test uses the {@link NumberSequenceRowSource} as a concrete
iterator source
+ * implementation, but covers all runtime-related aspects for all the
iterator-based sources
+ * together.
+ */
+public class IteratorSourcesITCase extends TestLogger {
+
+ private static final int PARALLELISM = 4;
+
+ @ClassRule
+ public static final MiniClusterWithClientResource MINI_CLUSTER =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .build());
+
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testParallelSourceExecution() throws Exception {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+
+ final DataStream<RowData> stream =
+ env.fromSource(
+ new NumberSequenceRowSource(1L, 1_000L),
+ WatermarkStrategy.noWatermarks(),
+ "iterator source");
+
+ final List<RowData> result =
+ DataStreamUtils.collectBoundedStream(stream, "Iterator Source
Test");
+
+ verifySequence(result, 1L, 1_000L);
+ }
+
+ // ------------------------------------------------------------------------
+ // test utils
+ // ------------------------------------------------------------------------
+
+ private static void verifySequence(
+ final List<RowData> sequence, final long from, final long to) {
+ if (sequence.size() != to - from + 1) {
+ fail(String.format("Expected: Sequence [%d, %d]. Found: %s", from,
to, sequence));
+ }
+
+ final List<Long> list =
+ sequence.stream()
+ .map(r -> r.getLong(0))
+ .sorted(Long::compareTo)
+ .collect(Collectors.toList());
+
+ int pos = 0;
+ for (long value = from; value <= to; value++, pos++) {
+ if (value != list.get(pos)) {
+ fail(String.format("Expected: Sequence [%d, %d]. Found: %s",
from, to, list));
+ }
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/NumberSequenceRowSourceTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/NumberSequenceRowSourceTest.java
new file mode 100644
index 000000000..5f17b842f
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/NumberSequenceRowSourceTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.fail;
+
+/** Tests for the {@link NumberSequenceRowSource}. */
+class NumberSequenceRowSourceTest {
+
+ @Test
+ void testReaderCheckpoints() throws Exception {
+ final long from = 177;
+ final long mid = 333;
+ final long to = 563;
+ final long elementsPerCycle = (to - from) / 3;
+
+ final TestingReaderOutput<RowData> out = new TestingReaderOutput<>();
+
+ SourceReader<RowData, NumberSequenceRowSource.NumberSequenceSplit>
reader = createReader();
+ reader.addSplits(
+ Arrays.asList(
+ new
NumberSequenceRowSource.NumberSequenceSplit("split-1", from, mid),
+ new
NumberSequenceRowSource.NumberSequenceSplit("split-2", mid + 1, to)));
+
+ long remainingInCycle = elementsPerCycle;
+ while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
+ if (--remainingInCycle <= 0) {
+ remainingInCycle = elementsPerCycle;
+ // checkpoint
+ List<NumberSequenceRowSource.NumberSequenceSplit> splits =
reader.snapshotState(1L);
+
+ // re-create and restore
+ reader = createReader();
+ if (splits.isEmpty()) {
+ reader.notifyNoMoreSplits();
+ } else {
+ reader.addSplits(splits);
+ }
+ }
+ }
+
+ final List<RowData> result = out.getEmittedRecords();
+ validateSequence(result, from, to);
+ }
+
+ private static void validateSequence(
+ final List<RowData> sequence, final long from, final long to) {
+ if (sequence.size() != to - from + 1) {
+ failSequence(sequence, from, to);
+ }
+
+ long nextExpected = from;
+ for (RowData next : sequence) {
+ if (next.getLong(0) != nextExpected++) {
+ failSequence(sequence, from, to);
+ }
+ }
+ }
+
+ private static void failSequence(final List<RowData> sequence, final long
from, final long to) {
+ fail(
+ String.format(
+ "Expected: A sequence [%d, %d], but found: sequence
(size %d) : %s",
+ from, to, sequence.size(), sequence));
+ }
+
+ private static SourceReader<RowData,
NumberSequenceRowSource.NumberSequenceSplit>
+ createReader() {
+ // the arguments passed in the source constructor matter only to the
enumerator
+ return new NumberSequenceRowSource(0L, 0L).createReader(new
DummyReaderContext());
+ }
+
+ // ------------------------------------------------------------------------
+ // test utils / mocks
+ //
+ // the "flink-connector-test-utils module has proper mocks and utils,
+ // but cannot be used here, because it would create a cyclic dependency.
+ // ------------------------------------------------------------------------
+
+ private static final class DummyReaderContext implements
SourceReaderContext {
+
+ @Override
+ public SourceReaderMetricGroup metricGroup() {
+ return UnregisteredMetricsGroup.createSourceReaderMetricGroup();
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return new Configuration();
+ }
+
+ @Override
+ public String getLocalHostName() {
+ return "localhost";
+ }
+
+ @Override
+ public int getIndexOfSubtask() {
+ return 0;
+ }
+
+ @Override
+ public void sendSplitRequest() {}
+
+ @Override
+ public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {}
+
+ @Override
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ return
SimpleUserCodeClassLoader.create(getClass().getClassLoader());
+ }
+
+ @Override
+ public int currentParallelism() {
+ return 1;
+ }
+ }
+
+ private static final class TestingReaderOutput<E> implements
ReaderOutput<E> {
+
+ private final ArrayList<E> emittedRecords = new ArrayList<>();
+
+ @Override
+ public void collect(E record) {
+ emittedRecords.add(record);
+ }
+
+ @Override
+ public void collect(E record, long timestamp) {
+ collect(record);
+ }
+
+ @Override
+ public void emitWatermark(Watermark watermark) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void markIdle() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void markActive() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SourceOutput<E> createOutputForSplit(String splitId) {
+ return this;
+ }
+
+ @Override
+ public void releaseOutputForSplit(String splitId) {}
+
+ public ArrayList<E> getEmittedRecords() {
+ return emittedRecords;
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
index 8b69ef285..f8aadb8bd 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
@@ -87,7 +87,8 @@ public abstract class FileStoreTableStatisticsTestBase {
null,
null,
null,
- null);
+ null,
+ false);
Assertions.assertThat(partitionFilterSource.reportStatistics().getRowCount()).isEqualTo(5L);
// TODO validate column statistics
}
@@ -107,7 +108,8 @@ public abstract class FileStoreTableStatisticsTestBase {
null,
null,
null,
- null);
+ null,
+ false);
Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(2L);
// TODO validate column statistics
}
@@ -127,7 +129,8 @@ public abstract class FileStoreTableStatisticsTestBase {
null,
null,
null,
- null);
+ null,
+ false);
Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(4L);
// TODO validate column statistics
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
index 8e4643ef5..f5d412167 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
@@ -51,7 +51,8 @@ public class PrimaryKeyTableStatisticsTest extends
FileStoreTableStatisticsTestB
null,
null,
null,
- null);
+ null,
+ false);
Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(9L);
// TODO validate column statistics
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
index f09ab0924..ce0017eb1 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.util;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.dag.Transformation;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
@@ -33,14 +34,22 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.operations.CollectModifyOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.QueryOperation;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.nio.file.Path;
import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
import java.util.UUID;
/** Similar to Flink's AbstractTestBase but using Junit5. */
@@ -303,4 +312,39 @@ public class AbstractTestBase {
return env;
}
}
+
+ public static Transformation<?> translate(TableEnvironment env, String
statement) {
+ TableEnvironmentImpl envImpl = (TableEnvironmentImpl) env;
+ List<Operation> operations = envImpl.getParser().parse(statement);
+
+ if (operations.size() != 1) {
+ throw new RuntimeException("No operation after parsing for " +
statement);
+ }
+
+ Operation operation = operations.get(0);
+ if (operation instanceof QueryOperation) {
+ QueryOperation queryOperation = (QueryOperation) operation;
+ CollectModifyOperation sinkOperation = new
CollectModifyOperation(queryOperation);
+ List<Transformation<?>> transformations;
+ try {
+ Method translate =
+
TableEnvironmentImpl.class.getDeclaredMethod("translate", List.class);
+ translate.setAccessible(true);
+ //noinspection unchecked
+ transformations =
+ (List<Transformation<?>>)
+ translate.invoke(envImpl,
Collections.singletonList(sinkOperation));
+ } catch (NoSuchMethodException | InvocationTargetException |
IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (transformations.size() != 1) {
+ throw new RuntimeException("No transformation after
translating for " + statement);
+ }
+
+ return transformations.get(0);
+ } else {
+ throw new RuntimeException();
+ }
+ }
}