This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e18596c7fc4 Temporarily remove BQMS catalog until it is opend-sourced
(#32386)
e18596c7fc4 is described below
commit e18596c7fc498e7f05ac3c32bb67cfc264fc76f2
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Fri Aug 30 20:40:36 2024 -0400
Temporarily remove BQMS catalog until it is opend-sourced (#32386)
---
.../IO_Iceberg_Integration_Tests.json | 2 +-
sdks/java/io/expansion-service/build.gradle | 2 -
.../java/io/iceberg/bigquerymetastore/build.gradle | 56 -----
sdks/java/io/iceberg/build.gradle | 6 -
.../sdk/io/iceberg/BigQueryMetastoreCatalogIT.java | 274 ---------------------
settings.gradle.kts | 2 -
6 files changed, 1 insertion(+), 341 deletions(-)
diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index 253d9796f90..1efc8e9e440 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run",
- "modification": 8
+ "modification": 1
}
diff --git a/sdks/java/io/expansion-service/build.gradle
b/sdks/java/io/expansion-service/build.gradle
index 37ee03eba00..498950b3dc4 100644
--- a/sdks/java/io/expansion-service/build.gradle
+++ b/sdks/java/io/expansion-service/build.gradle
@@ -52,8 +52,6 @@ dependencies {
// Needed for HiveCatalog
runtimeOnly ("org.apache.iceberg:iceberg-hive-metastore:1.4.2")
runtimeOnly project(path: ":sdks:java:io:iceberg:hive:exec", configuration:
"shadow")
- // Needed for BigQuery Metastore catalog (this isn't supported for java 8)
- runtimeOnly project(path: ":sdks:java:io:iceberg:bigquerymetastore",
configuration: "shadow")
runtimeOnly library.java.kafka_clients
runtimeOnly library.java.slf4j_jdk14
diff --git a/sdks/java/io/iceberg/bigquerymetastore/build.gradle
b/sdks/java/io/iceberg/bigquerymetastore/build.gradle
deleted file mode 100644
index 20e4a33b09f..00000000000
--- a/sdks/java/io/iceberg/bigquerymetastore/build.gradle
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * License); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-plugins {
- id 'org.apache.beam.module'
-}
-
-def bqmsLocation = "$buildDir/libs"
-
-applyJavaNature(
- automaticModuleName: 'org.apache.beam.sdk.io.iceberg.bqms',
- shadowClosure: {
- dependencies {
-
include(dependency(files("$bqmsLocation/iceberg-bigquery-catalog-1.5.2-0.1.0.jar")))
- }
- relocate 'com.google.guava',
getJavaRelocatedPath('iceberg.bqms.com.google.guava')
- },
- validateShadowJar: false
-)
-
-description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg :: BigQuery
Metastore"
-ext.summary = "A copy of the BQMS catalog with some popular libraries
relocated."
-
-task downloadBqmsJar(type: Copy) {
- def jarUrl =
'https://storage.googleapis.com/spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-0.1.0.jar'
- def outputDir = file("$bqmsLocation")
- outputDir.mkdirs()
-
- ant.get(src: jarUrl, dest: outputDir)
-}
-
-repositories {
- flatDir {
- dirs "$bqmsLocation"
- }
-}
-
-compileJava.dependsOn downloadBqmsJar
-
-dependencies {
- implementation
files("$bqmsLocation/iceberg-bigquery-catalog-1.5.2-0.1.0.jar")
-}
diff --git a/sdks/java/io/iceberg/build.gradle
b/sdks/java/io/iceberg/build.gradle
index 97e81afe712..3d653d6b276 100644
--- a/sdks/java/io/iceberg/build.gradle
+++ b/sdks/java/io/iceberg/build.gradle
@@ -55,7 +55,6 @@ dependencies {
implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
implementation library.java.hadoop_common
- testImplementation project(path:
":sdks:java:io:iceberg:bigquerymetastore", configuration: "shadow")
testImplementation library.java.hadoop_client
testImplementation library.java.bigdataoss_gcsio
testImplementation library.java.bigdataoss_gcs_connector
@@ -110,11 +109,6 @@ task integrationTest(type: Test) {
outputs.upToDateWhen { false }
include '**/*IT.class'
- // BQ metastore catalog doesn't support java 8
- if (project.findProperty('testJavaVersion') == '8' ||
- JavaVersion.current().equals(JavaVersion.VERSION_1_8)) {
- exclude '**/BigQueryMetastoreCatalogIT.class'
- }
maxParallelForks 4
classpath = sourceSets.test.runtimeClasspath
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BigQueryMetastoreCatalogIT.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BigQueryMetastoreCatalogIT.java
deleted file mode 100644
index 7abf2671082..00000000000
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BigQueryMetastoreCatalogIT.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.iceberg;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.LongStream;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.managed.Managed;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.Row;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.CatalogUtil;
-import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.ManifestFiles;
-import org.apache.iceberg.ManifestWriter;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TableScan;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.catalog.Namespace;
-import org.apache.iceberg.catalog.SupportsNamespaces;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.data.Record;
-import org.apache.iceberg.data.parquet.GenericParquetReaders;
-import org.apache.iceberg.data.parquet.GenericParquetWriter;
-import org.apache.iceberg.encryption.InputFilesDecryptor;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.DataWriter;
-import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.parquet.Parquet;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-/**
- * Integration tests for reading and writing Iceberg tables using the BigQuery
Metastore Catalog.
- */
-public class BigQueryMetastoreCatalogIT {
- private static final Schema DOUBLY_NESTED_ROW_SCHEMA =
- Schema.builder()
- .addStringField("doubly_nested_str")
- .addInt64Field("doubly_nested_float")
- .build();
-
- private static final Schema NESTED_ROW_SCHEMA =
- Schema.builder()
- .addStringField("nested_str")
- .addInt32Field("nested_int")
- .addFloatField("nested_float")
- .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA)
- .build();
- private static final Schema BEAM_SCHEMA =
- Schema.builder()
- .addStringField("str")
- .addBooleanField("bool")
- .addNullableInt32Field("nullable_int")
- .addNullableInt64Field("nullable_long")
- .addArrayField("arr_long", Schema.FieldType.INT64)
- .addRowField("row", NESTED_ROW_SCHEMA)
- .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA)
- .build();
-
- private static final SimpleFunction<Long, Row> ROW_FUNC =
- new SimpleFunction<Long, Row>() {
- @Override
- public Row apply(Long num) {
- String strNum = Long.toString(num);
- Row nestedRow =
- Row.withSchema(NESTED_ROW_SCHEMA)
- .addValue("nested_str_value_" + strNum)
- .addValue(Integer.valueOf(strNum))
- .addValue(Float.valueOf(strNum + "." + strNum))
- .addValue(
- Row.withSchema(DOUBLY_NESTED_ROW_SCHEMA)
- .addValue("doubly_nested_str_value_" + strNum)
- .addValue(num)
- .build())
- .build();
-
- return Row.withSchema(BEAM_SCHEMA)
- .addValue("str_value_" + strNum)
- .addValue(num % 2 == 0)
- .addValue(Integer.valueOf(strNum))
- .addValue(num)
- .addValue(LongStream.range(1, num %
10).boxed().collect(Collectors.toList()))
- .addValue(nestedRow)
- .addValue(num % 2 == 0 ? null : nestedRow)
- .build();
- }
- };
-
- private static final org.apache.iceberg.Schema ICEBERG_SCHEMA =
- IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA);
- private static final SimpleFunction<Row, Record> RECORD_FUNC =
- new SimpleFunction<Row, Record>() {
- @Override
- public Record apply(Row input) {
- return IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, input);
- }
- };
-
- @Rule public TestPipeline writePipeline = TestPipeline.create();
-
- @Rule public TestPipeline readPipeline = TestPipeline.create();
-
- private static final String TEST_CATALOG = "beam_test_" + System.nanoTime();
- private static final String DATASET = "iceberg_bigquerymetastore_test_" +
System.nanoTime();
- @Rule public TestName testName = new TestName();
- private static final String WAREHOUSE =
TestPipeline.testingPipelineOptions().getTempLocation();
- private static Catalog catalog;
- private static Map<String, String> catalogProps;
- private TableIdentifier tableIdentifier;
-
- @BeforeClass
- public static void setUp() {
- GcpOptions options =
TestPipeline.testingPipelineOptions().as(GcpOptions.class);
- catalogProps =
- ImmutableMap.<String, String>builder()
- .put("gcp_project", options.getProject())
- .put("gcp_location", "us-central1")
- .put("catalog-impl",
"org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog")
- .put("warehouse", WAREHOUSE)
- .build();
- catalog =
- CatalogUtil.loadCatalog(
- catalogProps.get("catalog-impl"), TEST_CATALOG, catalogProps, new
Configuration());
- catalog.initialize(TEST_CATALOG, catalogProps);
- ((SupportsNamespaces) catalog).createNamespace(Namespace.of(DATASET));
- }
-
- @After
- public void cleanup() {
- // We need to cleanup tables first before deleting the dataset
- catalog.dropTable(tableIdentifier);
- }
-
- @AfterClass
- public static void tearDown() {
- ((SupportsNamespaces) catalog).dropNamespace(Namespace.of(DATASET));
- }
-
- private Map<String, Object> getManagedIcebergConfig(TableIdentifier table) {
- return ImmutableMap.<String, Object>builder()
- .put("table", table.toString())
- .put("catalog_name", TEST_CATALOG)
- .put("catalog_properties", catalogProps)
- .build();
- }
-
- @Test
- public void testReadWithBqmsCatalog() throws IOException {
- tableIdentifier =
- TableIdentifier.parse(String.format("%s.%s", DATASET,
testName.getMethodName()));
- Table table = catalog.createTable(tableIdentifier, ICEBERG_SCHEMA);
-
- List<Row> expectedRows =
- LongStream.range(1,
1000).boxed().map(ROW_FUNC::apply).collect(Collectors.toList());
- List<Record> records =
-
expectedRows.stream().map(RECORD_FUNC::apply).collect(Collectors.toList());
-
- // write iceberg records with bqms catalog
- String filepath = table.location() + "/" + UUID.randomUUID();
- DataWriter<Record> writer =
- Parquet.writeData(table.io().newOutputFile(filepath))
- .schema(ICEBERG_SCHEMA)
- .createWriterFunc(GenericParquetWriter::buildWriter)
- .overwrite()
- .withSpec(table.spec())
- .build();
- for (Record rec : records) {
- writer.write(rec);
- }
- writer.close();
- AppendFiles appendFiles = table.newAppend();
- String manifestFilename = FileFormat.AVRO.addExtension(filepath +
".manifest");
- OutputFile outputFile = table.io().newOutputFile(manifestFilename);
- ManifestWriter<DataFile> manifestWriter;
- try (ManifestWriter<DataFile> openWriter =
ManifestFiles.write(table.spec(), outputFile)) {
- openWriter.add(writer.toDataFile());
- manifestWriter = openWriter;
- }
- appendFiles.appendManifest(manifestWriter.toManifestFile());
- appendFiles.commit();
-
- // Run Managed Iceberg read
- PCollection<Row> outputRows =
- readPipeline
- .apply(
-
Managed.read(Managed.ICEBERG).withConfig(getManagedIcebergConfig(tableIdentifier)))
- .getSinglePCollection();
- PAssert.that(outputRows).containsInAnyOrder(expectedRows);
- readPipeline.run().waitUntilFinish();
- }
-
- @Test
- public void testWriteWithBqmsCatalog() {
- tableIdentifier =
- TableIdentifier.parse(String.format("%s.%s", DATASET,
testName.getMethodName()));
- catalog.createTable(tableIdentifier,
IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA));
-
- List<Row> inputRows =
- LongStream.range(1,
1000).mapToObj(ROW_FUNC::apply).collect(Collectors.toList());
- List<Record> expectedRecords =
-
inputRows.stream().map(RECORD_FUNC::apply).collect(Collectors.toList());
-
- // Run Managed Iceberg write
- writePipeline
- .apply(Create.of(inputRows))
- .setRowSchema(BEAM_SCHEMA)
-
.apply(Managed.write(Managed.ICEBERG).withConfig(getManagedIcebergConfig(tableIdentifier)));
- writePipeline.run().waitUntilFinish();
-
- // read back the records and check everything's there
- Table table = catalog.loadTable(tableIdentifier);
- TableScan tableScan = table.newScan().project(ICEBERG_SCHEMA);
- List<Record> writtenRecords = new ArrayList<>();
- for (CombinedScanTask task : tableScan.planTasks()) {
- InputFilesDecryptor descryptor =
- new InputFilesDecryptor(task, table.io(), table.encryption());
- for (FileScanTask fileTask : task.files()) {
- InputFile inputFile = descryptor.getInputFile(fileTask);
- CloseableIterable<Record> iterable =
- Parquet.read(inputFile)
- .split(fileTask.start(), fileTask.length())
- .project(ICEBERG_SCHEMA)
- .createReaderFunc(
- fileSchema ->
GenericParquetReaders.buildReader(ICEBERG_SCHEMA, fileSchema))
- .filter(fileTask.residual())
- .build();
-
- for (Record rec : iterable) {
- writtenRecords.add(rec);
- }
- }
- }
- assertThat(expectedRecords, containsInAnyOrder(writtenRecords.toArray()));
- }
-}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 1a32a8f111c..65a55885afa 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -369,5 +369,3 @@ include("sdks:java:io:iceberg:hive")
findProject(":sdks:java:io:iceberg:hive")?.name = "hive"
include("sdks:java:io:iceberg:hive:exec")
findProject(":sdks:java:io:iceberg:hive:exec")?.name = "exec"
-include("sdks:java:io:iceberg:bigquerymetastore")
-findProject(":sdks:java:io:iceberg:bigquerymetastore")?.name =
"bigquerymetastore"