This is an automated email from the ASF dual-hosted git repository.
chamikaramj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 782953e3422 Add Delta Lake source to the Java Managed API (#38902)
782953e3422 is described below
commit 782953e342243e04f0c6b6d05b926c3ab1ef49a8
Author: Chamikara Jayalath <[email protected]>
AuthorDate: Thu Jun 11 09:56:42 2026 -0700
Add Delta Lake source to the Java Managed API (#38902)
---
.../model/pipeline/v1/external_transforms.proto | 2 +
sdks/java/io/delta/build.gradle | 1 +
.../io/delta/DeltaReadSchemaTransformProvider.java | 142 +++++++++++++++++++++
.../org/apache/beam/sdk/io/delta/DeltaIOTest.java | 41 ++++++
.../java/org/apache/beam/sdk/managed/Managed.java | 4 +
5 files changed, 190 insertions(+)
diff --git
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
index 043a72dd34f..c73986eed48 100644
---
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
+++
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
@@ -88,6 +88,8 @@ message ManagedTransforms {
"beam:schematransform:org.apache.beam:sql_server_read:v1"];
SQL_SERVER_WRITE = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:sql_server_write:v1"];
+ DELTA_LAKE_READ = 13 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+ "beam:schematransform:org.apache.beam:delta_lake_read:v1"];
}
}
diff --git a/sdks/java/io/delta/build.gradle b/sdks/java/io/delta/build.gradle
index c07aef6981b..57b1cd8ad87 100644
--- a/sdks/java/io/delta/build.gradle
+++ b/sdks/java/io/delta/build.gradle
@@ -33,6 +33,7 @@ def parquet_version = "1.16.0"
dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
+ implementation project(path: ":model:pipeline", configuration: "shadow")
implementation library.java.delta_kernel_api
implementation library.java.delta_kernel_defaults
diff --git
a/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProvider.java
b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProvider.java
new file mode 100644
index 00000000000..42ca3f24def
--- /dev/null
+++
b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaReadSchemaTransformProvider.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.delta;
+
+import static
org.apache.beam.sdk.io.delta.DeltaReadSchemaTransformProvider.Configuration;
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
+import
org.apache.beam.sdk.io.delta.DeltaReadSchemaTransformProvider.Configuration;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * SchemaTransform implementation for {@link DeltaIO#readRows}. Reads records
from Delta Lake and
+ * outputs a {@link org.apache.beam.sdk.values.PCollection} of Beam {@link
+ * org.apache.beam.sdk.values.Row}s.
+ */
+@AutoService(SchemaTransformProvider.class)
+public class DeltaReadSchemaTransformProvider extends
TypedSchemaTransformProvider<Configuration> {
+ static final String OUTPUT_TAG = "output";
+
+ @Override
+ protected SchemaTransform from(Configuration configuration) {
+ return new DeltaReadSchemaTransform(configuration);
+ }
+
+ @Override
+ public List<String> outputCollectionNames() {
+ return Collections.singletonList(OUTPUT_TAG);
+ }
+
+ @Override
+ public String identifier() {
+ return getUrn(ExternalTransforms.ManagedTransforms.Urns.DELTA_LAKE_READ);
+ }
+
+ static class DeltaReadSchemaTransform extends SchemaTransform {
+ private final Configuration configuration;
+
+ DeltaReadSchemaTransform(Configuration configuration) {
+ this.configuration =
+ java.util.Objects.requireNonNull(configuration, "configuration
cannot be null");
+ }
+
+ Row getConfigurationRow() {
+ try {
+ return SchemaRegistry.createDefault()
+ .getToRowFunction(Configuration.class)
+ .apply(configuration)
+ .sorted()
+ .toSnakeCase();
+ } catch (NoSuchSchemaException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ DeltaIO.ReadRows read =
DeltaIO.readRows().from(configuration.getTable());
+ if (configuration.getVersion() != null) {
+ read = read.withVersion(configuration.getVersion());
+ }
+ if (configuration.getTimestamp() != null) {
+ read = read.withTimestamp(configuration.getTimestamp());
+ }
+ Map<String, String> hadoopConfig = configuration.getHadoopConfig();
+ if (hadoopConfig != null) {
+ read = read.withConfig(hadoopConfig);
+ }
+
+ PCollection<Row> output = input.getPipeline().apply(read);
+
+ return PCollectionRowTuple.of(OUTPUT_TAG, output);
+ }
+ }
+
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class Configuration {
+ static Builder builder() {
+ return new
AutoValue_DeltaReadSchemaTransformProvider_Configuration.Builder();
+ }
+
+ @SchemaFieldDescription("Identifier of the Delta Lake table.")
+ abstract String getTable();
+
+ @SchemaFieldDescription("Version of the Delta Lake table to read.")
+ @Nullable
+ abstract Long getVersion();
+
+ @SchemaFieldDescription("Timestamp of the Delta Lake table to read.")
+ @Nullable
+ abstract String getTimestamp();
+
+ @SchemaFieldDescription("Properties passed to the Hadoop Configuration.")
+ @Nullable
+ abstract Map<String, String> getHadoopConfig();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setTable(String table);
+
+ abstract Builder setVersion(@Nullable Long version);
+
+ abstract Builder setTimestamp(@Nullable String timestamp);
+
+ abstract Builder setHadoopConfig(@Nullable Map<String, String>
hadoopConfig);
+
+ abstract Configuration build();
+ }
+ }
+}
diff --git
a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java
b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java
index ef6dd660c60..bd8bf8b3c8c 100644
---
a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java
+++
b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.delta.DeltaIO.ReadRows;
import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -52,6 +53,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -387,6 +389,45 @@ public class DeltaIOTest {
return java.nio.file.Files.readAllBytes(file.toPath());
}
+ @Test
+ public void testManagedDeltaRead() throws Exception {
+ File tableDir = tempFolder.newFolder("managed-delta-table");
+
+ // 1. Write a Parquet file to simulate a Delta table
+ Schema schema = Schema.builder().addField("name",
Schema.FieldType.STRING).build();
+ Row row = Row.withSchema(schema).addValues("test-name").build();
+ writeParquetFile(new File(tableDir, "part-00000.parquet"), row);
+
+ // 2. Create the Delta log
+ File logDir = new File(tableDir, "_delta_log");
+ logDir.mkdirs();
+ File commitFile = new File(logDir, "00000000000000000000.json");
+
+ File parquetFile = new File(tableDir, "part-00000.parquet");
+ byte[] fileBytes = Files.readAllBytes(parquetFile.toPath());
+
+ String commitContent =
+ "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}\n"
+ +
"{\"metaData\":{\"id\":\"test-id\",\"format\":{\"provider\":\"parquet\",\"options\":{}},\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"name\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\",\"partitionColumns\":[],\"configuration\":{},\"createdAt\":123456789}}\n"
+ +
"{\"add\":{\"path\":\"part-00000.parquet\",\"partitionValues\":{},\"size\":"
+ + fileBytes.length
+ + ",\"modificationTime\":123456789,\"dataChange\":true}}";
+
+ Files.write(commitFile.toPath(),
commitContent.getBytes(StandardCharsets.UTF_8));
+
+ // 3. Read it using Managed
+ PCollection<Row> output =
+ readPipeline
+ .apply(
+ Managed.read(Managed.DELTA_LAKE)
+ .withConfig(ImmutableMap.of("table",
tableDir.getAbsolutePath())))
+ .getSinglePCollection();
+
+ PAssert.that(output).containsInAnyOrder(row);
+
+ readPipeline.run().waitUntilFinish();
+ }
+
@Test
@org.junit.Ignore("Manual integration test with external local table")
public void testReadingLocalTable() throws Exception {
diff --git
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
index a5e7d879b44..9589992e079 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
@@ -93,6 +93,7 @@ public class Managed {
// TODO: Dynamically generate a list of supported transforms
public static final String ICEBERG = "iceberg";
+ public static final String DELTA_LAKE = "delta";
public static final String ICEBERG_CDC = "iceberg_cdc";
public static final String KAFKA = "kafka";
public static final String BIGQUERY = "bigquery";
@@ -104,6 +105,7 @@ public class Managed {
public static final Map<String, String> READ_TRANSFORMS =
ImmutableMap.<String, String>builder()
.put(ICEBERG,
getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ))
+ .put(DELTA_LAKE,
getUrn(ExternalTransforms.ManagedTransforms.Urns.DELTA_LAKE_READ))
.put(ICEBERG_CDC,
getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_CDC_READ))
.put(KAFKA,
getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ))
.put(BIGQUERY,
getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ))
@@ -128,6 +130,8 @@ public class Managed {
* <ul>
* <li>{@link Managed#ICEBERG} : Read from Apache Iceberg tables using <a
*
href="https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/iceberg/IcebergIO.html">IcebergIO</a>
+ * <li>{@link Managed#DELTA_LAKE} : Read from Delta Lake tables using <a
+ *
href="https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/delta/DeltaIO.html">DeltaIO</a>
* <li>{@link Managed#ICEBERG_CDC} : CDC Read from Apache Iceberg tables
using <a
*
href="https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/iceberg/IcebergIO.html">IcebergIO</a>
* <li>{@link Managed#KAFKA} : Read from Apache Kafka topics using <a