This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 2e1577e62b Flink: Backport support source watermark for flink sql
windows (#12697)
2e1577e62b is described below
commit 2e1577e62bf023daddb0ad7aec5ce6f094fb98dc
Author: Swapna Marru <[email protected]>
AuthorDate: Tue Apr 1 15:27:43 2025 -0700
Flink: Backport support source watermark for flink sql windows (#12697)
backports (#12191)
---
.../iceberg/flink/source/IcebergTableSource.java | 16 ++++++-
.../iceberg/flink/source/TestIcebergSourceSql.java | 54 +++++++++++++++++++++-
.../apache/iceberg/flink/source/TestSqlBase.java | 15 ++++++
.../iceberg/flink/source/IcebergTableSource.java | 16 ++++++-
.../iceberg/flink/source/TestIcebergSourceSql.java | 54 +++++++++++++++++++++-
.../apache/iceberg/flink/source/TestSqlBase.java | 15 ++++++
6 files changed, 166 insertions(+), 4 deletions(-)
diff --git
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
index 65adce77d9..662dc30e27 100644
---
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
+++
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
@@ -35,12 +35,14 @@ import
org.apache.flink.table.connector.source.ScanTableSource;
import
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import
org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.DataType;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkFilters;
+import org.apache.iceberg.flink.FlinkReadOptions;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.assigner.SplitAssignerType;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -53,7 +55,8 @@ public class IcebergTableSource
implements ScanTableSource,
SupportsProjectionPushDown,
SupportsFilterPushDown,
- SupportsLimitPushDown {
+ SupportsLimitPushDown,
+ SupportsSourceWatermark {
private int[] projectedFields;
private Long limit;
@@ -175,6 +178,17 @@ public class IcebergTableSource
return Result.of(acceptedFilters, flinkFilters);
}
+ @Override
+ public void applySourceWatermark() {
+ Preconditions.checkArgument(
+
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE),
+ "Source watermarks are supported only in flip-27 iceberg source
implementation");
+
+ Preconditions.checkNotNull(
+ properties.get(FlinkReadOptions.WATERMARK_COLUMN),
+ "watermark-column needs to be configured to use source watermark.");
+ }
+
@Override
public boolean supportsNestedProjection() {
// TODO: support nested projection
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
index c8f0b8172d..0cdaf8371c 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.flink.source;
import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.io.IOException;
import java.time.Instant;
@@ -40,6 +41,7 @@ import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -53,7 +55,11 @@ public class TestIcebergSourceSql extends TestSqlBase {
@BeforeEach
@Override
public void before() throws IOException {
- TableEnvironment tableEnvironment = getTableEnv();
+ setUpTableEnv(getTableEnv());
+ setUpTableEnv(getStreamingTableEnv());
+ }
+
+ private static void setUpTableEnv(TableEnvironment tableEnvironment) {
Configuration tableConf = tableEnvironment.getConfig().getConfiguration();
tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE,
true);
// Disable inferring parallelism to avoid interfering watermark tests
@@ -72,6 +78,11 @@ public class TestIcebergSourceSql extends TestSqlBase {
tableConf.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED,
true);
}
+ @AfterEach
+ public void after() throws IOException {
+ CATALOG_EXTENSION.catalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
+ }
+
private Record generateRecord(Instant t1, long t2) {
Record record = GenericRecord.create(SCHEMA_TS);
record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime());
@@ -178,4 +189,45 @@ public class TestIcebergSourceSql extends TestSqlBase {
expected,
SCHEMA_TS);
}
+
+ @Test
+ public void testWatermarkInvalidConfig() {
+ CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
SCHEMA_TS);
+
+ String flinkTable = "`default_catalog`.`default_database`.flink_table";
+ SqlHelpers.sql(
+ getStreamingTableEnv(),
+ "CREATE TABLE %s "
+ + "(eventTS AS CAST(t1 AS TIMESTAMP(3)), "
+ + "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) LIKE
iceberg_catalog.`default`.%s",
+ flinkTable,
+ TestFixtures.TABLE);
+
+ assertThatThrownBy(() -> SqlHelpers.sql(getStreamingTableEnv(), "SELECT *
FROM %s", flinkTable))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("watermark-column needs to be configured to use source
watermark.");
+ }
+
+ @Test
+ public void testWatermarkValidConfig() throws Exception {
+ List<Record> expected = generateExpectedRecords(true);
+
+ String flinkTable = "`default_catalog`.`default_database`.flink_table";
+
+ SqlHelpers.sql(
+ getStreamingTableEnv(),
+ "CREATE TABLE %s "
+ + "(eventTS AS CAST(t1 AS TIMESTAMP(3)), "
+ + "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) WITH
('watermark-column'='t1') LIKE iceberg_catalog.`default`.%s",
+ flinkTable,
+ TestFixtures.TABLE);
+
+ TestHelpers.assertRecordsWithOrder(
+ SqlHelpers.sql(
+ getStreamingTableEnv(),
+ "SELECT t1, t2 FROM TABLE(TUMBLE(TABLE %s, DESCRIPTOR(eventTS),
INTERVAL '1' SECOND))",
+ flinkTable),
+ expected,
+ SCHEMA_TS);
+ }
}
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
index f9b776397c..dd63154fe0 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
@@ -63,6 +63,8 @@ public abstract class TestSqlBase {
private volatile TableEnvironment tEnv;
+ private volatile TableEnvironment streamingTEnv;
+
protected TableEnvironment getTableEnv() {
if (tEnv == null) {
synchronized (this) {
@@ -75,6 +77,19 @@ public abstract class TestSqlBase {
return tEnv;
}
+ protected TableEnvironment getStreamingTableEnv() {
+ if (streamingTEnv == null) {
+ synchronized (this) {
+ if (streamingTEnv == null) {
+ this.streamingTEnv =
+
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
+ }
+ }
+ }
+
+ return streamingTEnv;
+ }
+
@BeforeEach
public abstract void before() throws IOException;
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
index 65adce77d9..662dc30e27 100644
---
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
@@ -35,12 +35,14 @@ import
org.apache.flink.table.connector.source.ScanTableSource;
import
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import
org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.DataType;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkFilters;
+import org.apache.iceberg.flink.FlinkReadOptions;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.assigner.SplitAssignerType;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -53,7 +55,8 @@ public class IcebergTableSource
implements ScanTableSource,
SupportsProjectionPushDown,
SupportsFilterPushDown,
- SupportsLimitPushDown {
+ SupportsLimitPushDown,
+ SupportsSourceWatermark {
private int[] projectedFields;
private Long limit;
@@ -175,6 +178,17 @@ public class IcebergTableSource
return Result.of(acceptedFilters, flinkFilters);
}
+ @Override
+ public void applySourceWatermark() {
+ Preconditions.checkArgument(
+
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE),
+ "Source watermarks are supported only in flip-27 iceberg source
implementation");
+
+ Preconditions.checkNotNull(
+ properties.get(FlinkReadOptions.WATERMARK_COLUMN),
+ "watermark-column needs to be configured to use source watermark.");
+ }
+
@Override
public boolean supportsNestedProjection() {
// TODO: support nested projection
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
index c8f0b8172d..0cdaf8371c 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.flink.source;
import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.io.IOException;
import java.time.Instant;
@@ -40,6 +41,7 @@ import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -53,7 +55,11 @@ public class TestIcebergSourceSql extends TestSqlBase {
@BeforeEach
@Override
public void before() throws IOException {
- TableEnvironment tableEnvironment = getTableEnv();
+ setUpTableEnv(getTableEnv());
+ setUpTableEnv(getStreamingTableEnv());
+ }
+
+ private static void setUpTableEnv(TableEnvironment tableEnvironment) {
Configuration tableConf = tableEnvironment.getConfig().getConfiguration();
tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE,
true);
// Disable inferring parallelism to avoid interfering watermark tests
@@ -72,6 +78,11 @@ public class TestIcebergSourceSql extends TestSqlBase {
tableConf.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED,
true);
}
+ @AfterEach
+ public void after() throws IOException {
+ CATALOG_EXTENSION.catalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
+ }
+
private Record generateRecord(Instant t1, long t2) {
Record record = GenericRecord.create(SCHEMA_TS);
record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime());
@@ -178,4 +189,45 @@ public class TestIcebergSourceSql extends TestSqlBase {
expected,
SCHEMA_TS);
}
+
+ @Test
+ public void testWatermarkInvalidConfig() {
+ CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
SCHEMA_TS);
+
+ String flinkTable = "`default_catalog`.`default_database`.flink_table";
+ SqlHelpers.sql(
+ getStreamingTableEnv(),
+ "CREATE TABLE %s "
+ + "(eventTS AS CAST(t1 AS TIMESTAMP(3)), "
+ + "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) LIKE
iceberg_catalog.`default`.%s",
+ flinkTable,
+ TestFixtures.TABLE);
+
+ assertThatThrownBy(() -> SqlHelpers.sql(getStreamingTableEnv(), "SELECT *
FROM %s", flinkTable))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("watermark-column needs to be configured to use source
watermark.");
+ }
+
+ @Test
+ public void testWatermarkValidConfig() throws Exception {
+ List<Record> expected = generateExpectedRecords(true);
+
+ String flinkTable = "`default_catalog`.`default_database`.flink_table";
+
+ SqlHelpers.sql(
+ getStreamingTableEnv(),
+ "CREATE TABLE %s "
+ + "(eventTS AS CAST(t1 AS TIMESTAMP(3)), "
+ + "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) WITH
('watermark-column'='t1') LIKE iceberg_catalog.`default`.%s",
+ flinkTable,
+ TestFixtures.TABLE);
+
+ TestHelpers.assertRecordsWithOrder(
+ SqlHelpers.sql(
+ getStreamingTableEnv(),
+ "SELECT t1, t2 FROM TABLE(TUMBLE(TABLE %s, DESCRIPTOR(eventTS),
INTERVAL '1' SECOND))",
+ flinkTable),
+ expected,
+ SCHEMA_TS);
+ }
}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
index f9b776397c..dd63154fe0 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
@@ -63,6 +63,8 @@ public abstract class TestSqlBase {
private volatile TableEnvironment tEnv;
+ private volatile TableEnvironment streamingTEnv;
+
protected TableEnvironment getTableEnv() {
if (tEnv == null) {
synchronized (this) {
@@ -75,6 +77,19 @@ public abstract class TestSqlBase {
return tEnv;
}
+ protected TableEnvironment getStreamingTableEnv() {
+ if (streamingTEnv == null) {
+ synchronized (this) {
+ if (streamingTEnv == null) {
+ this.streamingTEnv =
+
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
+ }
+ }
+ }
+
+ return streamingTEnv;
+ }
+
@BeforeEach
public abstract void before() throws IOException;