This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bb2e0ad37bf Revert "Add support for Iceberg table identifiers with
special characters (#33293)" (#33575)
bb2e0ad37bf is described below
commit bb2e0ad37bf521aebaf4839cf989855ae062f2c9
Author: Yi Hu <[email protected]>
AuthorDate: Tue Jan 14 14:40:32 2025 -0500
Revert "Add support for Iceberg table identifiers with special characters
(#33293)" (#33575)
This reverts commit d6e0b0c5c0f4cd8acad230833c2ed42e220d6178.
---
.../IO_Iceberg_Integration_Tests.json | 2 +-
sdks/java/io/iceberg/build.gradle | 4 +--
.../beam/sdk/io/iceberg/AppendFilesToTables.java | 3 +-
.../beam/sdk/io/iceberg/FileWriteResult.java | 5 ++-
.../IcebergReadSchemaTransformProvider.java | 3 +-
.../beam/sdk/io/iceberg/IcebergScanConfig.java | 7 ++--
.../apache/beam/sdk/io/iceberg/IcebergUtils.java | 17 ---------
.../io/iceberg/OneTableDynamicDestinations.java | 4 +--
.../io/iceberg/PortableIcebergDestinations.java | 3 +-
.../beam/sdk/io/iceberg/IcebergIOReadTest.java | 24 +++----------
.../beam/sdk/io/iceberg/IcebergUtilsTest.java | 40 ----------------------
11 files changed, 18 insertions(+), 94 deletions(-)
diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index b73af5e61a4..7ab7bcd9a9c 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run.",
- "modification": 1
+ "modification": 2
}
diff --git a/sdks/java/io/iceberg/build.gradle
b/sdks/java/io/iceberg/build.gradle
index 1775dfc5b77..41e12921e6f 100644
--- a/sdks/java/io/iceberg/build.gradle
+++ b/sdks/java/io/iceberg/build.gradle
@@ -55,10 +55,8 @@ dependencies {
implementation "org.apache.iceberg:iceberg-api:$iceberg_version"
implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
- runtimeOnly "org.apache.iceberg:iceberg-gcp:$iceberg_version"
implementation library.java.hadoop_common
- implementation library.java.jackson_core
- implementation library.java.jackson_databind
+ runtimeOnly "org.apache.iceberg:iceberg-gcp:$iceberg_version"
testImplementation project(":sdks:java:managed")
testImplementation library.java.hadoop_client
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
index 72220faf300..fed72a381d5 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
@@ -47,6 +47,7 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.slf4j.Logger;
@@ -132,7 +133,7 @@ class AppendFilesToTables
return;
}
- Table table =
getCatalog().loadTable(IcebergUtils.parseTableIdentifier(element.getKey()));
+ Table table =
getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
// vast majority of the time, we will simply append data files.
// in the rare case we get a batch that contains multiple partition
specs, we will group
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java
index d58ac8696d3..bf00bf8519f 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java
@@ -25,7 +25,6 @@ import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.catalog.TableIdentifierParser;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@AutoValue
@@ -42,7 +41,7 @@ abstract class FileWriteResult {
@SchemaIgnore
public TableIdentifier getTableIdentifier() {
if (cachedTableIdentifier == null) {
- cachedTableIdentifier =
IcebergUtils.parseTableIdentifier(getTableIdentifierString());
+ cachedTableIdentifier =
TableIdentifier.parse(getTableIdentifierString());
}
return cachedTableIdentifier;
}
@@ -68,7 +67,7 @@ abstract class FileWriteResult {
@SchemaIgnore
public Builder setTableIdentifier(TableIdentifier tableId) {
- return setTableIdentifierString(TableIdentifierParser.toJson(tableId));
+ return setTableIdentifierString(tableId.toString());
}
public abstract FileWriteResult build();
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
index 951442e2c95..d44149fda08 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
@@ -31,6 +31,7 @@ import
org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
+import org.apache.iceberg.catalog.TableIdentifier;
/**
* SchemaTransform implementation for {@link IcebergIO#readRows}. Reads
records from Iceberg and
@@ -85,7 +86,7 @@ public class IcebergReadSchemaTransformProvider
.getPipeline()
.apply(
IcebergIO.readRows(configuration.getIcebergCatalog())
-
.from(IcebergUtils.parseTableIdentifier(configuration.getTable())));
+ .from(TableIdentifier.parse(configuration.getTable())));
return PCollectionRowTuple.of(OUTPUT_TAG, output);
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java
index 640283d83c2..60372b172af 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java
@@ -23,7 +23,6 @@ import org.apache.beam.sdk.schemas.Schema;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.catalog.TableIdentifierParser;
import org.apache.iceberg.expressions.Expression;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -52,9 +51,7 @@ public abstract class IcebergScanConfig implements
Serializable {
public Table getTable() {
if (cachedTable == null) {
cachedTable =
- getCatalogConfig()
- .catalog()
-
.loadTable(IcebergUtils.parseTableIdentifier(getTableIdentifier()));
+
getCatalogConfig().catalog().loadTable(TableIdentifier.parse(getTableIdentifier()));
}
return cachedTable;
}
@@ -129,7 +126,7 @@ public abstract class IcebergScanConfig implements
Serializable {
public abstract Builder setTableIdentifier(String tableIdentifier);
public Builder setTableIdentifier(TableIdentifier tableIdentifier) {
- return
this.setTableIdentifier(TableIdentifierParser.toJson(tableIdentifier));
+ return this.setTableIdentifier(tableIdentifier.toString());
}
public Builder setTableIdentifier(String... names) {
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
index bd2f743172d..ef19a588136 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
@@ -19,9 +19,6 @@ package org.apache.beam.sdk.io.iceberg;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -39,8 +36,6 @@ import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.Row;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.catalog.TableIdentifierParser;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
@@ -52,9 +47,6 @@ import org.joda.time.Instant;
/** Utilities for converting between Beam and Iceberg types, made public for
user's convenience. */
public class IcebergUtils {
-
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
private IcebergUtils() {}
private static final Map<Schema.TypeName, Type> BEAM_TYPES_TO_ICEBERG_TYPES =
@@ -514,13 +506,4 @@ public class IcebergUtils {
// LocalDateTime, LocalDate, LocalTime
return icebergValue;
}
-
- public static TableIdentifier parseTableIdentifier(String table) {
- try {
- JsonNode jsonNode = OBJECT_MAPPER.readTree(table);
- return TableIdentifierParser.fromJson(jsonNode);
- } catch (JsonProcessingException e) {
- return TableIdentifier.parse(table);
- }
- }
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java
index be810aa20a1..861a8ad198a 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java
@@ -41,7 +41,7 @@ class OneTableDynamicDestinations implements
DynamicDestinations, Externalizable
@VisibleForTesting
TableIdentifier getTableIdentifier() {
if (tableId == null) {
- tableId =
IcebergUtils.parseTableIdentifier(checkStateNotNull(tableIdString));
+ tableId = TableIdentifier.parse(checkStateNotNull(tableIdString));
}
return tableId;
}
@@ -86,6 +86,6 @@ class OneTableDynamicDestinations implements
DynamicDestinations, Externalizable
@Override
public void readExternal(ObjectInput in) throws IOException {
tableIdString = in.readUTF();
- tableId = IcebergUtils.parseTableIdentifier(tableIdString);
+ tableId = TableIdentifier.parse(tableIdString);
}
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java
index 58f70463bc7..47f661bba3f 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.util.RowStringInterpolator;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.Nullable;
class PortableIcebergDestinations implements DynamicDestinations {
@@ -72,7 +73,7 @@ class PortableIcebergDestinations implements
DynamicDestinations {
@Override
public IcebergDestination instantiateDestination(String dest) {
return IcebergDestination.builder()
- .setTableIdentifier(IcebergUtils.parseTableIdentifier(dest))
+ .setTableIdentifier(TableIdentifier.parse(dest))
.setTableCreateConfig(null)
.setFileFormat(FileFormat.fromString(fileFormat))
.build();
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
index 1c3f9b53f31..0406ff31e61 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
@@ -23,8 +23,6 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
import java.io.File;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -70,11 +68,11 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@RunWith(Parameterized.class)
+@RunWith(JUnit4.class)
public class IcebergIOReadTest {
private static final Logger LOG =
LoggerFactory.getLogger(IcebergIOReadTest.class);
@@ -85,21 +83,6 @@ public class IcebergIOReadTest {
@Rule public TestPipeline testPipeline = TestPipeline.create();
- @Parameterized.Parameters
- public static Collection<Object[]> data() {
- return Arrays.asList(
- new Object[][] {
- {String.format("{\"namespace\": [\"default\"], \"name\": \"%s\"}",
tableId())},
- {String.format("default.%s", tableId())},
- });
- }
-
- public static String tableId() {
- return "table" + Long.toString(UUID.randomUUID().hashCode(), 16);
- }
-
- @Parameterized.Parameter public String tableStringIdentifier;
-
static class PrintRow extends DoFn<Row, Row> {
@ProcessElement
@@ -111,7 +94,8 @@ public class IcebergIOReadTest {
@Test
public void testSimpleScan() throws Exception {
- TableIdentifier tableId =
IcebergUtils.parseTableIdentifier(tableStringIdentifier);
+ TableIdentifier tableId =
+ TableIdentifier.of("default", "table" +
Long.toString(UUID.randomUUID().hashCode(), 16));
Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA);
final Schema schema =
IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA);
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java
index 918c6b1146e..134f05c34bf 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java
@@ -19,13 +19,11 @@ package org.apache.beam.sdk.io.iceberg;
import static org.apache.beam.sdk.io.iceberg.IcebergUtils.TypeAndMaxId;
import static
org.apache.beam.sdk.io.iceberg.IcebergUtils.beamFieldTypeToIcebergFieldType;
-import static org.apache.beam.sdk.io.iceberg.IcebergUtils.parseTableIdentifier;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import java.math.BigDecimal;
@@ -34,7 +32,6 @@ import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.schemas.Schema;
@@ -52,7 +49,6 @@ import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import org.junit.runners.Parameterized;
/** Test class for {@link IcebergUtils}. */
@RunWith(Enclosed.class)
@@ -806,40 +802,4 @@ public class IcebergUtilsTest {
assertEquals(BEAM_SCHEMA_STRUCT, convertedBeamSchema);
}
}
-
- @RunWith(Parameterized.class)
- public static class TableIdentifierParseTests {
-
- @Parameterized.Parameters
- public static Collection<Object[]> data() {
- return Arrays.asList(
- new Object[][] {
- {
- "{\"namespace\": [\"dogs\", \"owners.and.handlers\"], \"name\":
\"food\"}",
- "dogs.owners.and.handlers.food",
- true
- },
- {"dogs.owners.and.handlers.food", "dogs.owners.and.handlers.food",
true},
- {"{\"name\": \"food\"}", "food", true},
- {"{\"table_name\": \"food\"}", "{\"table_name\": \"food\"}",
false},
- });
- }
-
- @Parameterized.Parameter public String input;
-
- @Parameterized.Parameter(1)
- public String expected;
-
- @Parameterized.Parameter(2)
- public boolean shouldSucceed;
-
- @Test
- public void test() {
- if (shouldSucceed) {
- assertEquals(expected, parseTableIdentifier(input).toString());
- } else {
- assertThrows(IllegalArgumentException.class, () ->
parseTableIdentifier(input));
- }
- }
- }
}