This is an automated email from the ASF dual-hosted git repository.
chamikaramj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b6aaf42e92e Initial skeleton for the Delta Lake source (#38571)
b6aaf42e92e is described below
commit b6aaf42e92eff572752783cc92efd354e8bb8d7a
Author: Chamikara Jayalath <[email protected]>
AuthorDate: Thu May 21 10:00:56 2026 -0700
Initial skeleton for the Delta Lake source (#38571)
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 3 +
sdks/java/io/delta/build.gradle | 39 ++++++++++
.../java/org/apache/beam/sdk/io/delta/DeltaIO.java | 91 ++++++++++++++++++++++
.../org/apache/beam/sdk/io/delta/package-info.java | 24 ++++++
.../org/apache/beam/sdk/io/delta/DeltaIOTest.java | 61 +++++++++++++++
settings.gradle.kts | 1 +
6 files changed, 219 insertions(+)
diff --git
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 27b9cef9637..5ca0de9de84 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -605,6 +605,7 @@ class BeamModulePlugin implements Plugin<Project> {
def cdap_version = "6.5.1"
def checkerframework_version = "3.42.0"
def classgraph_version = "4.8.162"
+ def delta_lake_version = "4.2.0"
def dbcp2_version = "2.9.0"
def errorprone_version = "2.31.0"
// [bomupgrader] determined by: com.google.api:gax, consistent with:
google_cloud_platform_libraries_bom
@@ -729,6 +730,8 @@ class BeamModulePlugin implements Plugin<Project> {
commons_logging :
"commons-logging:commons-logging:1.2",
commons_math3 :
"org.apache.commons:commons-math3:3.6.1",
dbcp2 :
"org.apache.commons:commons-dbcp2:$dbcp2_version",
+ delta_kernel_api :
"io.delta:delta-kernel-api:$delta_lake_version",
+ delta_kernel_defaults :
"io.delta:delta-kernel-defaults:$delta_lake_version",
envoy_control_plane_api :
"io.envoyproxy.controlplane:api:1.0.49",
error_prone_annotations :
"com.google.errorprone:error_prone_annotations:$errorprone_version",
failsafe :
"dev.failsafe:failsafe:3.3.0",
diff --git a/sdks/java/io/delta/build.gradle b/sdks/java/io/delta/build.gradle
new file mode 100644
index 00000000000..617965b3bc4
--- /dev/null
+++ b/sdks/java/io/delta/build.gradle
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(
+ automaticModuleName: 'org.apache.beam.sdk.io.delta',
+ // Latest version of the Delta Kernel API requires Java 17.
+ requireJavaVersion: JavaVersion.VERSION_17,
+)
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Delta Lake"
+ext.summary = "Integration with Delta Lake."
+
+
+dependencies {
+ implementation project(path: ":sdks:java:core", configuration: "shadow")
+ implementation library.java.delta_kernel_api
+ implementation library.java.delta_kernel_defaults
+
+ permitUnusedDeclared library.java.delta_kernel_api
+ permitUnusedDeclared library.java.delta_kernel_defaults
+
+ testImplementation library.java.junit
+}
diff --git
a/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java
b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java
new file mode 100644
index 00000000000..6c5df4728b4
--- /dev/null
+++ b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.delta;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A connector that reads from <a href="https://delta.io/">Delta Lake</a>
tables.
+ *
+ * <p>This is work in progress. For more details and to track progress, please
see <a
+ * href="https://github.com/apache/beam/issues/21100">Issue 21100</a>.
+ */
+@Internal
+public class DeltaIO {
+
+ public static ReadRows readRows() {
+ return new AutoValue_DeltaIO_ReadRows.Builder().build();
+ }
+
+ @AutoValue
+ public abstract static class ReadRows extends PTransform<PBegin,
PCollection<Row>> {
+
+ public abstract @Nullable String getTablePath();
+
+ public abstract @Nullable Long getVersion();
+
+ public abstract @Nullable String getTimestamp();
+
+ public abstract @Nullable Map<String, String> getHadoopConfig();
+
+ abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setTablePath(String tablePath);
+
+ abstract Builder setVersion(@Nullable Long version);
+
+ abstract Builder setTimestamp(@Nullable String timestamp);
+
+ abstract Builder setHadoopConfig(@Nullable Map<String, String>
hadoopConfig);
+
+ abstract ReadRows build();
+ }
+
+ public ReadRows from(String tablePath) {
+ return toBuilder().setTablePath(tablePath).build();
+ }
+
+ public ReadRows withVersion(@Nullable Long version) {
+ return toBuilder().setVersion(version).build();
+ }
+
+ public ReadRows withTimestamp(@Nullable String timestamp) {
+ return toBuilder().setTimestamp(timestamp).build();
+ }
+
+ public ReadRows withConfig(Map<String, String> config) {
+ return toBuilder().setHadoopConfig(config).build();
+ }
+
+ @Override
+ public PCollection<Row> expand(PBegin input) {
+ // TODO(https://github.com/apache/beam/issues/38551): Implement
expansion for
+ // Delta Lake ReadRows
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+ }
+}
diff --git
a/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/package-info.java
b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/package-info.java
new file mode 100644
index 00000000000..c765a7fb765
--- /dev/null
+++
b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading from Delta Lake.
+ *
+ * @see org.apache.beam.sdk.io.delta.DeltaIO
+ */
+package org.apache.beam.sdk.io.delta;
diff --git
a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java
b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java
new file mode 100644
index 00000000000..b09fa8f69b3
--- /dev/null
+++
b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.delta;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.io.delta.DeltaIO.ReadRows;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for the {@link DeltaIO}. */
+@RunWith(JUnit4.class)
+public class DeltaIOTest {
+
+ @Test
+ public void testReadRowsBuilderAndGetters() {
+ String tablePath = "/path/to/table";
+ long version = 5L;
+ String timestamp = "2026-05-20T15:43:26Z";
+ Map<String, String> hadoopConfig = new HashMap<>();
+ hadoopConfig.put("fs.defaultFS", "file:///");
+
+ ReadRows readRows = DeltaIO.readRows()
+ .from(tablePath)
+ .withVersion(version)
+ .withTimestamp(timestamp)
+ .withConfig(hadoopConfig);
+
+ Assert.assertEquals(tablePath, readRows.getTablePath());
+ Assert.assertEquals(Long.valueOf(version), readRows.getVersion());
+ Assert.assertEquals(timestamp, readRows.getTimestamp());
+ Assert.assertEquals(hadoopConfig, readRows.getHadoopConfig());
+ }
+
+ @Test
+ public void testReadRowsNullDefaults() {
+ ReadRows readRows = DeltaIO.readRows();
+
+ Assert.assertNull(readRows.getTablePath());
+ Assert.assertNull(readRows.getVersion());
+ Assert.assertNull(readRows.getTimestamp());
+ Assert.assertNull(readRows.getHadoopConfig());
+ }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 9c6bd2b3059..1ead92a9cfc 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -226,6 +226,7 @@ include(":sdks:java:io:file-based-io-tests")
include(":sdks:java:io:bigquery-io-perf-tests")
include(":sdks:java:io:cdap")
include(":sdks:java:io:csv")
+include(":sdks:java:io:delta")
include(":sdks:java:io:datadog")
include(":sdks:java:io:file-schema-transform")
include(":sdks:java:io:google-ads")