This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4d627b40898 [Iceberg] Add BQMS test that validates using a BQ query
(#33625)
4d627b40898 is described below
commit 4d627b40898f64114aaebeffc7779db38e4a520d
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Fri Jan 24 19:29:22 2025 +0000
[Iceberg] Add BQMS test that validates using a BQ query (#33625)
* add test
* spotless
* another query test
---
.../IO_Iceberg_Integration_Tests.json | 2 +-
sdks/java/io/iceberg/build.gradle | 2 +
.../catalog/BigQueryMetastoreCatalogIT.java | 77 +++++++++++++++++++++-
.../sdk/io/iceberg/catalog/HadoopCatalogIT.java | 2 +-
.../io/iceberg/catalog/IcebergCatalogBaseIT.java | 20 ++++--
5 files changed, 91 insertions(+), 12 deletions(-)
diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index 37dd25bf902..b73af5e61a4 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run.",
- "modification": 3
+ "modification": 1
}
diff --git a/sdks/java/io/iceberg/build.gradle
b/sdks/java/io/iceberg/build.gradle
index d84205791d4..8613a4b22c6 100644
--- a/sdks/java/io/iceberg/build.gradle
+++ b/sdks/java/io/iceberg/build.gradle
@@ -84,6 +84,8 @@ dependencies {
// BigQueryMetastore catalog dep
testImplementation project(path: ":sdks:java:io:iceberg:bqms",
configuration: "shadow")
+ testImplementation project(":sdks:java:io:google-cloud-platform")
+ testImplementation library.java.google_api_services_bigquery
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration:
"shadow")
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java
index 887b0cfa56c..3a8b47cb5a0 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java
@@ -17,19 +17,49 @@
*/
package org.apache.beam.sdk.io.iceberg.catalog;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.managed.Managed;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.RowFilter;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
public class BigQueryMetastoreCatalogIT extends IcebergCatalogBaseIT {
+ private static final BigqueryClient BQ_CLIENT = new
BigqueryClient("BigQueryMetastoreCatalogIT");
static final String BQMS_CATALOG =
"org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog";
- static final String DATASET = "managed_iceberg_bqms_tests_no_delete";
+ static final String DATASET = "managed_iceberg_bqms_tests_" +
System.nanoTime();;
static final long SALT = System.nanoTime();
+ @BeforeClass
+ public static void createDataset() throws IOException, InterruptedException {
+ BQ_CLIENT.createNewDataset(OPTIONS.getProject(), DATASET);
+ }
+
+ @AfterClass
+ public static void deleteDataset() {
+ BQ_CLIENT.deleteDataset(OPTIONS.getProject(), DATASET);
+ }
+
@Override
public String tableId() {
return DATASET + "." + testName.getMethodName() + "_" + SALT;
@@ -41,7 +71,7 @@ public class BigQueryMetastoreCatalogIT extends
IcebergCatalogBaseIT {
BQMS_CATALOG,
"bqms_" + catalogName,
ImmutableMap.<String, String>builder()
- .put("gcp_project", options.getProject())
+ .put("gcp_project", OPTIONS.getProject())
.put("gcp_location", "us-central1")
.put("warehouse", warehouse)
.build(),
@@ -65,7 +95,7 @@ public class BigQueryMetastoreCatalogIT extends
IcebergCatalogBaseIT {
.put(
"catalog_properties",
ImmutableMap.<String, String>builder()
- .put("gcp_project", options.getProject())
+ .put("gcp_project", OPTIONS.getProject())
.put("gcp_location", "us-central1")
.put("warehouse", warehouse)
.put("catalog-impl", BQMS_CATALOG)
@@ -73,4 +103,45 @@ public class BigQueryMetastoreCatalogIT extends
IcebergCatalogBaseIT {
.build())
.build();
}
+
+ @Test
+ public void testWriteToPartitionedAndValidateWithBQQuery()
+ throws IOException, InterruptedException {
+ // For an example row where bool=true, modulo_5=3, str=value_303,
+ // this partition spec will create a partition like:
/bool=true/modulo_5=3/str_trunc=value_3/
+ PartitionSpec partitionSpec =
+ PartitionSpec.builderFor(ICEBERG_SCHEMA)
+ .identity("bool")
+ .hour("datetime")
+ .truncate("str", "value_x".length())
+ .build();
+ catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA,
partitionSpec);
+
+ // Write with Beam
+ Map<String, Object> config = managedIcebergConfig(tableId());
+ PCollection<Row> input =
pipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA);
+ input.apply(Managed.write(Managed.ICEBERG).withConfig(config));
+ pipeline.run().waitUntilFinish();
+
+ // Fetch records using a BigQuery query and validate
+ BigqueryClient bqClient = new BigqueryClient(getClass().getSimpleName());
+ String query = String.format("SELECT * FROM `%s.%s`",
OPTIONS.getProject(), tableId());
+ List<TableRow> rows = bqClient.queryUnflattened(query,
OPTIONS.getProject(), true, true);
+ List<Row> beamRows =
+ rows.stream()
+ .map(tr -> BigQueryUtils.toBeamRow(BEAM_SCHEMA, tr))
+ .collect(Collectors.toList());
+
+ assertThat(beamRows, containsInAnyOrder(inputRows.toArray()));
+
+ String queryByPartition =
+ String.format("SELECT bool, datetime FROM `%s.%s`",
OPTIONS.getProject(), tableId());
+ rows = bqClient.queryUnflattened(queryByPartition, OPTIONS.getProject(),
true, true);
+ RowFilter rowFilter = new
RowFilter(BEAM_SCHEMA).keep(Arrays.asList("bool", "datetime"));
+ beamRows =
+ rows.stream()
+ .map(tr -> BigQueryUtils.toBeamRow(rowFilter.outputSchema(), tr))
+ .collect(Collectors.toList());
+ assertThat(beamRows,
containsInAnyOrder(inputRows.stream().map(rowFilter::filter).toArray()));
+ }
}
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java
index 4e6766cc496..dc5e3b26324 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java
@@ -37,7 +37,7 @@ public class HadoopCatalogIT extends IcebergCatalogBaseIT {
@Override
public Catalog createCatalog() {
Configuration catalogHadoopConf = new Configuration();
- catalogHadoopConf.set("fs.gs.project.id", options.getProject());
+ catalogHadoopConf.set("fs.gs.project.id", OPTIONS.getProject());
catalogHadoopConf.set("fs.gs.auth.type", "APPLICATION_DEFAULT");
HadoopCatalog catalog = new HadoopCatalog();
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
index f11c77ccc68..518470138e9 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
@@ -147,7 +147,12 @@ public abstract class IcebergCatalogBaseIT implements
Serializable {
@Before
public void setUp() throws Exception {
- options = TestPipeline.testingPipelineOptions().as(GcpOptions.class);
+ warehouse =
+ String.format(
+ "%s/%s/%s",
+ TestPipeline.testingPipelineOptions().getTempLocation(),
+ getClass().getSimpleName(),
+ RANDOM);
warehouse = warehouse(getClass());
catalogSetup();
catalog = createCatalog();
@@ -162,7 +167,7 @@ public abstract class IcebergCatalogBaseIT implements
Serializable {
}
try {
- GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil();
+ GcsUtil gcsUtil = OPTIONS.as(GcsOptions.class).getGcsUtil();
GcsPath path = GcsPath.fromUri(warehouse);
@Nullable
@@ -190,7 +195,8 @@ public abstract class IcebergCatalogBaseIT implements
Serializable {
protected static String warehouse;
public Catalog catalog;
- protected static GcpOptions options;
+ protected static final GcpOptions OPTIONS =
+ TestPipeline.testingPipelineOptions().as(GcpOptions.class);
private static final String RANDOM = UUID.randomUUID().toString();
@Rule public TestPipeline pipeline = TestPipeline.create();
@Rule public TestName testName = new TestName();
@@ -210,7 +216,7 @@ public abstract class IcebergCatalogBaseIT implements
Serializable {
.addInt32Field("nested_int")
.addFloatField("nested_float")
.build();
- private static final Schema BEAM_SCHEMA =
+ protected static final Schema BEAM_SCHEMA =
Schema.builder()
.addStringField("str")
.addStringField("char")
@@ -262,16 +268,16 @@ public abstract class IcebergCatalogBaseIT implements
Serializable {
}
};
- private static final org.apache.iceberg.Schema ICEBERG_SCHEMA =
+ protected static final org.apache.iceberg.Schema ICEBERG_SCHEMA =
IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA);
- private static final SimpleFunction<Row, Record> RECORD_FUNC =
+ protected static final SimpleFunction<Row, Record> RECORD_FUNC =
new SimpleFunction<Row, Record>() {
@Override
public Record apply(Row input) {
return IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, input);
}
};
- private final List<Row> inputRows =
+ protected final List<Row> inputRows =
LongStream.range(0,
numRecords()).boxed().map(ROW_FUNC::apply).collect(Collectors.toList());
/** Populates the Iceberg table and Returns a {@link List<Row>} of expected
elements. */