This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch revert-35981-update_iceberg in repository https://gitbox.apache.org/repos/asf/beam.git
commit ef1d032e17bd7b4dce981376d824dac094e0c366 Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Wed Sep 10 11:16:08 2025 -0400 Revert "Update iceberg to 1.9.2 (#35981)" This reverts commit 32f5be6c36c17ae87e1b3dc11426b95c352d3584. --- .../IO_Iceberg_Integration_Tests.json | 2 +- .../IO_Iceberg_Integration_Tests_Dataflow.json | 2 +- ...Iceberg_Managed_Integration_Tests_Dataflow.json | 2 +- .github/trigger_files/beam_PostCommit_SQL.json | 2 +- CHANGES.md | 2 - build.gradle.kts | 3 - examples/java/build.gradle | 3 + examples/java/iceberg/build.gradle | 89 ---------------------- .../cookbook}/IcebergBatchWriteExample.java | 2 +- .../cookbook}/IcebergRestCatalogCDCExample.java | 2 +- .../IcebergRestCatalogStreamingWriteExample.java | 2 +- .../examples/cookbook}/IcebergTaxiExamples.java | 2 +- sdks/java/extensions/sql/build.gradle | 7 ++ sdks/java/extensions/sql/iceberg/build.gradle | 81 -------------------- .../provider/iceberg/IcebergCatalogRegistrar.java | 31 -------- .../sql/meta/catalog/InMemoryCatalogRegistrar.java | 6 +- .../sql/meta/provider/iceberg/IcebergCatalog.java | 0 .../sql/meta/provider/iceberg/IcebergFilter.java | 0 .../sql/meta/provider/iceberg/IcebergTable.java | 0 .../provider/iceberg/IcebergTableProvider.java | 0 .../sql/meta/provider/iceberg/package-info.java | 0 .../sdk/extensions/sql}/PubsubToIcebergIT.java | 3 +- .../provider/iceberg/BeamSqlCliIcebergTest.java | 0 .../meta/provider/iceberg/IcebergFilterTest.java | 0 .../meta/provider/iceberg/IcebergReadWriteIT.java | 6 +- .../provider/iceberg/IcebergTableProviderTest.java | 0 sdks/java/io/expansion-service/build.gradle | 15 ++-- sdks/java/io/iceberg/build.gradle | 12 +-- .../sdk/io/iceberg/RecordWriterManagerTest.java | 40 ++-------- settings.gradle.kts | 4 - 30 files changed, 44 insertions(+), 274 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 34a6e02150e..37dd25bf902 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 4 + "modification": 3 } diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json b/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json index 5abe02fc09c..8fab48cc672 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 1 + "modification": 5 } diff --git a/.github/trigger_files/IO_Iceberg_Managed_Integration_Tests_Dataflow.json b/.github/trigger_files/IO_Iceberg_Managed_Integration_Tests_Dataflow.json index 5abe02fc09c..8fab48cc672 100644 --- a/.github/trigger_files/IO_Iceberg_Managed_Integration_Tests_Dataflow.json +++ b/.github/trigger_files/IO_Iceberg_Managed_Integration_Tests_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 1 + "modification": 5 } diff --git a/.github/trigger_files/beam_PostCommit_SQL.json b/.github/trigger_files/beam_PostCommit_SQL.json index 833fd9b0d17..5ac8a7f3f6e 100644 --- a/.github/trigger_files/beam_PostCommit_SQL.json +++ b/.github/trigger_files/beam_PostCommit_SQL.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run ", - "modification": 2 + "modification": 4 } diff --git a/CHANGES.md b/CHANGES.md index e59e28b6083..72f99543532 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -103,7 +103,6 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). -* Upgraded Iceberg dependency to 1.9.2 ([#35981](https://github.com/apache/beam/pull/35981)) ## New Features / Improvements @@ -132,7 +131,6 @@ significant digits related to casting. * (Python) The deterministic fallback coder for complex types like NamedTuple, Enum, and dataclasses now uses cloudpickle instead of dill. If your pipeline is affected, you may see a warning like: "Using fallback deterministic coder for type X...". You can revert to the previous behavior by using the pipeline option `--update_compatibility_version=2.67.0` ([35725](https://github.com/apache/beam/pull/35725)). Report any pickling related issues to [#34903](https://github.com/apache/beam/is [...] * (Python) Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues. -* Dropped Java 8 support for [IO expansion-service](https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-io-expansion-service). Cross-language pipelines using this expansion service will need a Java11+ runtime ([#35981](https://github.com/apache/beam/pull/35981). ## Deprecations diff --git a/build.gradle.kts b/build.gradle.kts index 5ca2e29b4ed..4ee27a8454b 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -253,7 +253,6 @@ tasks.register("javaPreCommit") { dependsOn(":examples:java:sql:preCommit") dependsOn(":examples:java:twitter:build") dependsOn(":examples:java:twitter:preCommit") - dependsOn(":examples:java:iceberg:build") dependsOn(":examples:multi-language:build") dependsOn(":model:fn-execution:build") dependsOn(":model:job-management:build") @@ -381,7 +380,6 @@ tasks.register("sqlPreCommit") { dependsOn(":sdks:java:extensions:sql:datacatalog:build") dependsOn(":sdks:java:extensions:sql:expansion-service:build") dependsOn(":sdks:java:extensions:sql:hcatalog:build") - dependsOn(":sdks:java:extensions:sql:iceberg:build") dependsOn(":sdks:java:extensions:sql:jdbc:build") dependsOn(":sdks:java:extensions:sql:jdbc:preCommit") dependsOn(":sdks:java:extensions:sql:perf-tests:build") @@ -428,7 +426,6 @@ tasks.register("sqlPostCommit") { dependsOn(":sdks:java:extensions:sql:postCommit") dependsOn(":sdks:java:extensions:sql:jdbc:postCommit") dependsOn(":sdks:java:extensions:sql:datacatalog:postCommit") - dependsOn(":sdks:java:extensions:sql:iceberg:integrationTest") dependsOn(":sdks:java:extensions:sql:hadoopVersionsTest") } diff --git a/examples/java/build.gradle b/examples/java/build.gradle index 6f35a109998..0f1a1f7ef7e 100644 --- a/examples/java/build.gradle +++ b/examples/java/build.gradle @@ -71,6 +71,9 @@ dependencies { implementation project(":sdks:java:extensions:python") implementation project(":sdks:java:io:google-cloud-platform") implementation project(":sdks:java:io:kafka") + runtimeOnly project(":sdks:java:io:iceberg") + runtimeOnly project(":sdks:java:io:iceberg:bqms") + implementation project(":sdks:java:managed") implementation project(":sdks:java:extensions:ml") implementation library.java.avro implementation library.java.bigdataoss_util diff --git a/examples/java/iceberg/build.gradle b/examples/java/iceberg/build.gradle deleted file mode 100644 index 09ef64d32ee..00000000000 --- a/examples/java/iceberg/build.gradle +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -plugins { - id 'java' - id 'org.apache.beam.module' - id 'com.gradleup.shadow' -} - -applyJavaNature( - exportJavadoc: false, - automaticModuleName: 'org.apache.beam.examples.iceberg', - // iceberg requires Java11+ - requireJavaVersion: JavaVersion.VERSION_11 -) - -description = "Apache Beam :: Examples :: Java :: Iceberg" -ext.summary = """Apache Beam Java SDK examples using IcebergIO.""" - -/** Define the list of runners which execute a precommit test. - * Some runners are run from separate projects, see the preCommit task below - * for details. - */ -def preCommitRunners = ["directRunner", "flinkRunner", "sparkRunner"] -// The following runners have configuration created but not added to preCommit -def nonPreCommitRunners = ["dataflowRunner", "prismRunner"] -for (String runner : preCommitRunners) { - configurations.create(runner + "PreCommit") -} -for (String runner: nonPreCommitRunners) { - configurations.create(runner + "PreCommit") -} -configurations.sparkRunnerPreCommit { - // Ban certain dependencies to prevent a StackOverflow within Spark - // because JUL -> SLF4J -> JUL, and similarly JDK14 -> SLF4J -> JDK14 - exclude group: "org.slf4j", module: "jul-to-slf4j" - exclude group: "org.slf4j", module: "slf4j-jdk14" -} - -dependencies { - implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) - runtimeOnly project(":sdks:java:io:iceberg") - runtimeOnly project(":sdks:java:io:iceberg:bqms") - implementation project(path: ":sdks:java:core", configuration: "shadow") - implementation project(":sdks:java:extensions:google-cloud-platform-core") - implementation project(":sdks:java:io:google-cloud-platform") - implementation project(":sdks:java:managed") - implementation library.java.google_auth_library_oauth2_http - implementation library.java.joda_time - runtimeOnly project(path: ":runners:direct-java", configuration: "shadow") - implementation library.java.vendored_guava_32_1_2_jre - runtimeOnly library.java.hadoop_client - runtimeOnly library.java.bigdataoss_gcs_connector - - // Add dependencies for the PreCommit configurations - // For each runner a project level dependency on the examples project. - for (String runner : preCommitRunners) { - delegate.add(runner + "PreCommit", project(path: ":examples:java", configuration: "testRuntimeMigration")) - } - directRunnerPreCommit project(path: ":runners:direct-java", configuration: "shadow") - flinkRunnerPreCommit project(":runners:flink:${project.ext.latestFlinkVersion}") - sparkRunnerPreCommit project(":runners:spark:3") - sparkRunnerPreCommit project(":sdks:java:io:hadoop-file-system") - dataflowRunnerPreCommit project(":runners:google-cloud-dataflow-java") - dataflowRunnerPreCommit project(":runners:google-cloud-dataflow-java:worker") // v2 worker - dataflowRunnerPreCommit project(":sdks:java:harness") // v2 worker - prismRunnerPreCommit project(":runners:prism:java") - - // Add dependency if requested on command line for runner - if (project.hasProperty("runnerDependency")) { - runtimeOnly project(path: project.getProperty("runnerDependency")) - } -} diff --git a/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergBatchWriteExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergBatchWriteExample.java similarity index 99% rename from examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergBatchWriteExample.java rename to examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergBatchWriteExample.java index 2a5f85e524e..458f2b54545 100644 --- a/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergBatchWriteExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergBatchWriteExample.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.examples.iceberg; +package org.apache.beam.examples.cookbook; import java.io.IOException; import java.util.Map; diff --git a/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergRestCatalogCDCExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergRestCatalogCDCExample.java similarity index 99% rename from examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergRestCatalogCDCExample.java rename to examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergRestCatalogCDCExample.java index 4229e401ab9..ecc047a5694 100644 --- a/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergRestCatalogCDCExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergRestCatalogCDCExample.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.examples.iceberg; +package org.apache.beam.examples.cookbook; import static org.apache.beam.sdk.managed.Managed.ICEBERG_CDC; diff --git a/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergRestCatalogStreamingWriteExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergRestCatalogStreamingWriteExample.java similarity index 99% rename from examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergRestCatalogStreamingWriteExample.java rename to examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergRestCatalogStreamingWriteExample.java index 0ea73cdf0c8..63dc4ff7056 100644 --- a/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergRestCatalogStreamingWriteExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergRestCatalogStreamingWriteExample.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.examples.iceberg; +package org.apache.beam.examples.cookbook; import com.google.auth.oauth2.GoogleCredentials; import java.io.IOException; diff --git a/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergTaxiExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergTaxiExamples.java similarity index 99% rename from examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergTaxiExamples.java rename to examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergTaxiExamples.java index 5b4fe1b9b91..446d11d03be 100644 --- a/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergTaxiExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergTaxiExamples.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.examples.iceberg; +package org.apache.beam.examples.cookbook; import java.util.Arrays; import java.util.Map; diff --git a/sdks/java/extensions/sql/build.gradle b/sdks/java/extensions/sql/build.gradle index afbc87f8eeb..2ad4ddf306b 100644 --- a/sdks/java/extensions/sql/build.gradle +++ b/sdks/java/extensions/sql/build.gradle @@ -76,6 +76,10 @@ dependencies { fmppTask "org.freemarker:freemarker:2.3.31" fmppTemplates library.java.vendored_calcite_1_40_0 implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation project(":sdks:java:managed") + implementation project(":sdks:java:io:iceberg") + runtimeOnly project(":sdks:java:io:iceberg:bqms") + runtimeOnly project(":sdks:java:io:iceberg:hive") implementation project(":sdks:java:extensions:avro") implementation project(":sdks:java:extensions:join-library") permitUnusedDeclared project(":sdks:java:extensions:join-library") // BEAM-11761 @@ -116,6 +120,9 @@ dependencies { permitUnusedDeclared library.java.hadoop_client provided library.java.kafka_clients + implementation "org.apache.iceberg:iceberg-api:1.6.1" + permitUnusedDeclared "org.apache.iceberg:iceberg-api:1.6.1" // errorprone crash cannot find this transient dep + testImplementation "org.apache.iceberg:iceberg-core:1.6.1" testImplementation library.java.vendored_calcite_1_40_0 testImplementation library.java.vendored_guava_32_1_2_jre testImplementation library.java.junit diff --git a/sdks/java/extensions/sql/iceberg/build.gradle b/sdks/java/extensions/sql/iceberg/build.gradle deleted file mode 100644 index d5f9e74c53b..00000000000 --- a/sdks/java/extensions/sql/iceberg/build.gradle +++ /dev/null @@ -1,81 +0,0 @@ -import groovy.json.JsonOutput - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -plugins { id 'org.apache.beam.module' } - -applyJavaNature( - automaticModuleName: 'org.apache.beam.sdk.extensions.sql.meta.provider.hcatalog', - // iceberg requires Java11+ - requireJavaVersion: JavaVersion.VERSION_11, -) - -dependencies { - implementation project(":sdks:java:extensions:sql") - implementation project(":sdks:java:core") - implementation project(":sdks:java:managed") - implementation project(":sdks:java:io:iceberg") - runtimeOnly project(":sdks:java:io:iceberg:bqms") - runtimeOnly project(":sdks:java:io:iceberg:hive") - // TODO(https://github.com/apache/beam/issues/21156): Determine how to build without this dependency - provided "org.immutables:value:2.8.8" - permitUnusedDeclared "org.immutables:value:2.8.8" - implementation library.java.slf4j_api - implementation library.java.vendored_guava_32_1_2_jre - implementation library.java.vendored_calcite_1_40_0 - implementation library.java.jackson_databind - - testImplementation library.java.joda_time - testImplementation library.java.junit - testImplementation library.java.google_api_services_bigquery - testImplementation "org.apache.iceberg:iceberg-api:1.9.2" - testImplementation "org.apache.iceberg:iceberg-core:1.9.2" - testImplementation project(":sdks:java:io:google-cloud-platform") - testImplementation project(":sdks:java:extensions:google-cloud-platform-core") -} - -task integrationTest(type: Test) { - def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' - def gcsTempRoot = project.findProperty('gcsTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests/' - - // Disable Gradle cache (it should not be used because the IT's won't run). - outputs.upToDateWhen { false } - - def pipelineOptions = [ - "--project=${gcpProject}", - "--tempLocation=${gcsTempRoot}", - "--blockOnRun=false"] - - systemProperty "beamTestPipelineOptions", JsonOutput.toJson(pipelineOptions) - - include '**/*IT.class' - - maxParallelForks 4 - classpath = project(":sdks:java:extensions:sql:iceberg") - .sourceSets - .test - .runtimeClasspath - testClassesDirs = files(project(":sdks:java:extensions:sql:iceberg").sourceSets.test.output.classesDirs) - useJUnit { } -} - -configurations.all { - // iceberg-core needs avro:1.12.0 - resolutionStrategy.force 'org.apache.avro:avro:1.12.0' -} diff --git a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalogRegistrar.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalogRegistrar.java deleted file mode 100644 index 03c524f7b0f..00000000000 --- a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalogRegistrar.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg; - -import com.google.auto.service.AutoService; -import org.apache.beam.sdk.extensions.sql.meta.catalog.Catalog; -import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogRegistrar; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; - -@AutoService(CatalogRegistrar.class) -public class IcebergCatalogRegistrar implements CatalogRegistrar { - @Override - public Iterable<Class<? extends Catalog>> getCatalogs() { - return ImmutableList.<Class<? extends Catalog>>builder().add(IcebergCatalog.class).build(); - } -} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java index afffa24e6cd..2d94e19c168 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalogRegistrar.java @@ -18,12 +18,16 @@ package org.apache.beam.sdk.extensions.sql.meta.catalog; import com.google.auto.service.AutoService; +import org.apache.beam.sdk.extensions.sql.meta.provider.iceberg.IcebergCatalog; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @AutoService(CatalogRegistrar.class) public class InMemoryCatalogRegistrar implements CatalogRegistrar { @Override public Iterable<Class<? extends Catalog>> getCatalogs() { - return ImmutableList.<Class<? extends Catalog>>builder().add(InMemoryCatalog.class).build(); + return ImmutableList.<Class<? extends Catalog>>builder() + .add(InMemoryCatalog.class) + .add(IcebergCatalog.class) + .build(); } } diff --git a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java similarity index 100% rename from sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java diff --git a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilter.java similarity index 100% rename from sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilter.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilter.java diff --git a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java similarity index 100% rename from sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java diff --git a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java similarity index 100% rename from sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java diff --git a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/package-info.java similarity index 100% rename from sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/package-info.java rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/package-info.java diff --git a/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/PubsubToIcebergIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java similarity index 98% rename from sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/PubsubToIcebergIT.java rename to sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java index bdd710c861e..7343c9b9a52 100644 --- a/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/PubsubToIcebergIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg; +package org.apache.beam.sdk.extensions.sql; import static java.lang.String.format; import static java.nio.charset.StandardCharsets.UTF_8; @@ -33,7 +33,6 @@ import java.util.List; import java.util.UUID; import java.util.stream.Collectors; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; -import org.apache.beam.sdk.extensions.sql.SqlTransform; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub; diff --git a/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergTest.java similarity index 100% rename from sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergTest.java rename to sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergTest.java diff --git a/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilterTest.java similarity index 100% rename from sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilterTest.java rename to sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergFilterTest.java diff --git a/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java similarity index 98% rename from sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java rename to sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java index a7b128b2bca..a223194b8e9 100644 --- a/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg; import static java.lang.String.format; import static java.util.Arrays.asList; +import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone; import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN; import static org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE; import static org.apache.beam.sdk.schemas.Schema.FieldType.FLOAT; @@ -63,7 +64,6 @@ import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.joda.time.Duration; -import org.joda.time.format.DateTimeFormat; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -238,9 +238,7 @@ public class IcebergReadWriteIT { (float) 1.0, 1.0, true, - DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS") - .withZoneUTC() - .parseDateTime("2018-05-28 20:17:40.123"), + parseTimestampWithUTCTimeZone("2018-05-28 20:17:40.123"), "varchar", "char", asList("123", "456"), diff --git a/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java similarity index 100% rename from sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java rename to sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index 08c3f2b051d..a70eb7ea931 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -25,8 +25,6 @@ applyJavaNature( exportJavadoc: false, validateShadowJar: false, shadowClosure: {}, - // iceberg requires Java11+ - requireJavaVersion: JavaVersion.VERSION_11 ) // We don't want to use the latest version for the entire beam sdk since beam Java users can override it themselves. @@ -67,17 +65,18 @@ dependencies { permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761 implementation project(":sdks:java:managed") permitUnusedDeclared project(":sdks:java:managed") // BEAM-11761 + implementation project(":sdks:java:io:iceberg") + permitUnusedDeclared project(":sdks:java:io:iceberg") // BEAM-11761 implementation project(":sdks:java:io:kafka") permitUnusedDeclared project(":sdks:java:io:kafka") // BEAM-11761 implementation project(":sdks:java:io:kafka:upgrade") permitUnusedDeclared project(":sdks:java:io:kafka:upgrade") // BEAM-11761 - if (JavaVersion.current().compareTo(JavaVersion.VERSION_11) >= 0 && project.findProperty('testJavaVersion') != '8') { - // iceberg ended support for Java 8 in 1.7.0 - runtimeOnly project(":sdks:java:io:iceberg") - runtimeOnly project(":sdks:java:io:iceberg:hive") - runtimeOnly project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow") - } + // **** IcebergIO catalogs **** + // HiveCatalog + runtimeOnly project(path: ":sdks:java:io:iceberg:hive") + // BigQueryMetastoreCatalog (Java 11+) + runtimeOnly project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow") runtimeOnly library.java.kafka_clients runtimeOnly library.java.slf4j_jdk14 diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 0f0fa0a2bb9..ba1d27b0e3e 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -23,8 +23,6 @@ import java.util.stream.Collectors plugins { id 'org.apache.beam.module' } applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.io.iceberg', - // iceberg ended support for Java 8 in 1.7.0 - requireJavaVersion: JavaVersion.VERSION_11, ) description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg" @@ -39,7 +37,10 @@ def hadoopVersions = [ hadoopVersions.each {kv -> configurations.create("hadoopVersion$kv.key")} -def iceberg_version = "1.9.2" +// we cannot upgrade this since the newer iceberg requires Java 11 +// many other modules like examples/expansion use Java 8 and have the iceberg dependency +// def iceberg_version = "1.9.0" +def iceberg_version = "1.6.1" def parquet_version = "1.15.2" def orc_version = "1.9.2" def hive_version = "3.1.3" @@ -106,11 +107,6 @@ dependencies { } } -configurations.all { - // iceberg-core needs avro:1.12.0 - resolutionStrategy.force 'org.apache.avro:avro:1.12.0' -} - hadoopVersions.each {kv -> configurations."hadoopVersion$kv.key" { resolutionStrategy { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index 36b74967f0b..b240442deb6 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -34,8 +34,8 @@ import java.net.URLEncoder; import java.nio.ByteBuffer; import java.time.LocalDate; import java.time.LocalDateTime; -import java.time.LocalTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -59,15 +59,11 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; -import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.types.Conversions; -import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.DateTimeUtil; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; -import org.joda.time.ReadableDateTime; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; @@ -536,35 +532,13 @@ public class RecordWriterManagerTest { assertEquals(1, dataFile.getRecordCount()); // build this string: bool=true/int=1/long=1/float=1.0/double=1.0/str=str List<String> expectedPartitions = new ArrayList<>(); - - for (PartitionField field : spec.fields()) { - String name = field.name(); - Type type = spec.schema().findType(name); - Transform<Object, Object> transform = (Transform<Object, Object>) field.transform(); - String val; - switch (name) { - case "date": - LocalDate localDate = checkStateNotNull(row.getValue(name)); - Integer day = Integer.parseInt(String.valueOf(localDate.toEpochDay())); - val = transform.toHumanString(type, day); - break; - case "time": - LocalTime localTime = checkStateNotNull(row.getValue(name)); - val = transform.toHumanString(type, localTime.toNanoOfDay() / 1000); - break; - case "datetime": - LocalDateTime ldt = checkStateNotNull(row.getValue(name)); - val = transform.toHumanString(type, DateTimeUtil.microsFromTimestamp(ldt)); - break; - case "datetime_tz": - ReadableDateTime dt = checkStateNotNull(row.getDateTime(name)); - val = transform.toHumanString(type, dt.getMillis() * 1000); - break; - default: - val = transform.toHumanString(type, checkStateNotNull(row.getValue(name))); - break; + List<String> dateTypes = Arrays.asList("date", "time", "datetime", "datetime_tz"); + for (Schema.Field field : primitiveTypeSchema.getFields()) { + Object val = checkStateNotNull(row.getValue(field.getName())); + if (dateTypes.contains(field.getName())) { + val = URLEncoder.encode(val.toString(), UTF_8.toString()); } - expectedPartitions.add(name + "=" + URLEncoder.encode(val, UTF_8.toString())); + expectedPartitions.add(field.getName() + "=" + val); } String expectedPartitionPath = String.join("/", expectedPartitions); assertEquals(expectedPartitionPath, dataFile.getPartitionPath()); diff --git a/settings.gradle.kts b/settings.gradle.kts index a773571e6ca..135d9da42b0 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -372,7 +372,3 @@ include("sdks:java:io:iceberg:bqms") findProject(":sdks:java:io:iceberg:bqms")?.name = "bqms" include("it:clickhouse") findProject(":it:clickhouse")?.name = "clickhouse" -include("sdks:java:extensions:sql:iceberg") -findProject(":sdks:java:extensions:sql:iceberg")?.name = "iceberg" -include("examples:java:iceberg") -findProject(":examples:java:iceberg")?.name = "iceberg"