This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ac423af5699 Pass-through IcebergIO catalog properties (#31726)
ac423af5699 is described below
commit ac423af5699682c5519ae8d1a3af035ba7a5eab7
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Mon Jul 8 13:15:03 2024 -0400
Pass-through IcebergIO catalog properties (#31726)
* Pass-through iceberg catalog properties
* add to CHANGES.md
* trigger integration test
* remove custom configuration; pass catalog name
---
.../IO_Iceberg_Integration_Tests.json | 3 +-
CHANGES.md | 2 +
.../beam/sdk/io/iceberg/IcebergCatalogConfig.java | 197 +--------------------
.../IcebergReadSchemaTransformProvider.java | 36 ++--
.../IcebergSchemaTransformCatalogConfig.java | 107 -----------
.../IcebergWriteSchemaTransformProvider.java | 42 ++---
.../apache/beam/sdk/io/iceberg/IcebergIOIT.java | 16 +-
.../beam/sdk/io/iceberg/IcebergIOReadTest.java | 11 +-
.../beam/sdk/io/iceberg/IcebergIOWriteTest.java | 31 ++--
.../IcebergReadSchemaTransformProviderTest.java | 34 ++--
.../IcebergSchemaTransformTranslationTest.java | 49 +++--
.../IcebergWriteSchemaTransformProviderTest.java | 34 ++--
.../apache/beam/sdk/io/iceberg/ScanSourceTest.java | 28 ++-
13 files changed, 152 insertions(+), 438 deletions(-)
diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index a03c067d2c4..1efc8e9e440 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,3 +1,4 @@
{
- "comment": "Modify this file in a trivial way to cause this test suite to
run"
+ "comment": "Modify this file in a trivial way to cause this test suite to
run",
+ "modification": 1
}
diff --git a/CHANGES.md b/CHANGES.md
index 0a620038f11..85f1be48cfb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -67,12 +67,14 @@
## New Features / Improvements
* Multiple RunInference instances can now share the same model instance by
setting the model_identifier parameter (Python)
([#31665](https://github.com/apache/beam/issues/31665)).
+* [IcebergIO] All specified catalog properties are passed through to the
connector ([#31726](https://github.com/apache/beam/pull/31726))
* Removed a 3rd party LGPL dependency from the Go SDK
([#31765](https://github.com/apache/beam/issues/31765)).
* Support for MapState and SetState when using Dataflow Runner v1 with
Streaming Engine (Java)
([[#18200](https://github.com/apache/beam/issues/18200)])
## Breaking Changes
* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
+* [IcebergIO] IcebergCatalogConfig was changed to support specifying catalog
properties in a key-store fashion
([#31726](https://github.com/apache/beam/pull/31726))
## Deprecations
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
index fefef4aa491..2956d75a266 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
@@ -19,214 +19,35 @@ package org.apache.beam.sdk.io.iceberg;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
-import javax.annotation.Nullable;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import java.util.Properties;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.checkerframework.dataflow.qual.Pure;
@AutoValue
public abstract class IcebergCatalogConfig implements Serializable {
-
- @Pure
- public abstract String getName();
-
- /* Core Properties */
- @Pure
- public abstract @Nullable String getIcebergCatalogType();
-
- @Pure
- public abstract @Nullable String getCatalogImplementation();
-
- @Pure
- public abstract @Nullable String getFileIOImplementation();
-
- @Pure
- public abstract @Nullable String getWarehouseLocation();
-
- @Pure
- public abstract @Nullable String getMetricsReporterImplementation();
-
- /* Caching */
- @Pure
- public abstract boolean getCacheEnabled();
-
- @Pure
- public abstract boolean getCacheCaseSensitive();
-
- @Pure
- public abstract long getCacheExpirationIntervalMillis();
-
- @Pure
- public abstract boolean getIOManifestCacheEnabled();
-
- @Pure
- public abstract long getIOManifestCacheExpirationIntervalMillis();
-
- @Pure
- public abstract long getIOManifestCacheMaxTotalBytes();
-
- @Pure
- public abstract long getIOManifestCacheMaxContentLength();
-
- @Pure
- public abstract @Nullable String getUri();
-
- @Pure
- public abstract int getClientPoolSize();
-
- @Pure
- public abstract long getClientPoolEvictionIntervalMs();
-
- @Pure
- public abstract @Nullable String getClientPoolCacheKeys();
-
- @Pure
- public abstract @Nullable String getLockImplementation();
-
- @Pure
- public abstract long getLockHeartbeatIntervalMillis();
-
- @Pure
- public abstract long getLockHeartbeatTimeoutMillis();
-
- @Pure
- public abstract int getLockHeartbeatThreads();
-
- @Pure
- public abstract long getLockAcquireIntervalMillis();
-
- @Pure
- public abstract long getLockAcquireTimeoutMillis();
-
- @Pure
- public abstract @Nullable String getAppIdentifier();
-
- @Pure
- public abstract @Nullable String getUser();
-
@Pure
- public abstract long getAuthSessionTimeoutMillis();
+ public abstract String getCatalogName();
@Pure
- public abstract @Nullable Configuration getConfiguration();
+ public abstract Properties getProperties();
@Pure
public static Builder builder() {
- return new AutoValue_IcebergCatalogConfig.Builder()
- .setIcebergCatalogType(null)
- .setCatalogImplementation(null)
- .setFileIOImplementation(null)
- .setWarehouseLocation(null)
- .setMetricsReporterImplementation(null) // TODO: Set this to our
implementation
- .setCacheEnabled(CatalogProperties.CACHE_ENABLED_DEFAULT)
- .setCacheCaseSensitive(CatalogProperties.CACHE_CASE_SENSITIVE_DEFAULT)
-
.setCacheExpirationIntervalMillis(CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT)
-
.setIOManifestCacheEnabled(CatalogProperties.IO_MANIFEST_CACHE_ENABLED_DEFAULT)
- .setIOManifestCacheExpirationIntervalMillis(
- CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT)
- .setIOManifestCacheMaxTotalBytes(
- CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT)
- .setIOManifestCacheMaxContentLength(
- CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT)
- .setUri(null)
- .setClientPoolSize(CatalogProperties.CLIENT_POOL_SIZE_DEFAULT)
- .setClientPoolEvictionIntervalMs(
- CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT)
- .setClientPoolCacheKeys(null)
- .setLockImplementation(null)
-
.setLockHeartbeatIntervalMillis(CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT)
-
.setLockHeartbeatTimeoutMillis(CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT)
-
.setLockHeartbeatThreads(CatalogProperties.LOCK_HEARTBEAT_THREADS_DEFAULT)
-
.setLockAcquireIntervalMillis(CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS_DEFAULT)
-
.setLockAcquireTimeoutMillis(CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT)
- .setAppIdentifier(null)
- .setUser(null)
-
.setAuthSessionTimeoutMillis(CatalogProperties.AUTH_SESSION_TIMEOUT_MS_DEFAULT)
- .setConfiguration(null);
- }
-
- @Pure
- public ImmutableMap<String, String> properties() {
- return new PropertyBuilder()
- .put(CatalogUtil.ICEBERG_CATALOG_TYPE, getIcebergCatalogType())
- .put(CatalogProperties.CATALOG_IMPL, getCatalogImplementation())
- .put(CatalogProperties.FILE_IO_IMPL, getFileIOImplementation())
- .put(CatalogProperties.WAREHOUSE_LOCATION, getWarehouseLocation())
- .put(CatalogProperties.METRICS_REPORTER_IMPL,
getMetricsReporterImplementation())
- .put(CatalogProperties.CACHE_ENABLED, getCacheEnabled())
- .put(CatalogProperties.CACHE_CASE_SENSITIVE, getCacheCaseSensitive())
- .put(CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS,
getCacheExpirationIntervalMillis())
- .build();
+ return new AutoValue_IcebergCatalogConfig.Builder();
}
public org.apache.iceberg.catalog.Catalog catalog() {
- Configuration conf = getConfiguration();
- if (conf == null) {
- conf = new Configuration();
- }
- return CatalogUtil.buildIcebergCatalog(getName(), properties(), conf);
+ return CatalogUtil.buildIcebergCatalog(
+ getCatalogName(), Maps.fromProperties(getProperties()), new
Configuration());
}
@AutoValue.Builder
public abstract static class Builder {
+ public abstract Builder setCatalogName(String catalogName);
- /* Core Properties */
- public abstract Builder setName(String name);
-
- public abstract Builder setIcebergCatalogType(@Nullable String
icebergType);
-
- public abstract Builder setCatalogImplementation(@Nullable String
catalogImpl);
-
- public abstract Builder setFileIOImplementation(@Nullable String
fileIOImpl);
-
- public abstract Builder setWarehouseLocation(@Nullable String warehouse);
-
- public abstract Builder setMetricsReporterImplementation(@Nullable String
metricsImpl);
-
- /* Caching */
- public abstract Builder setCacheEnabled(boolean cacheEnabled);
-
- public abstract Builder setCacheCaseSensitive(boolean cacheCaseSensitive);
-
- public abstract Builder setCacheExpirationIntervalMillis(long expiration);
-
- public abstract Builder setIOManifestCacheEnabled(boolean enabled);
-
- public abstract Builder setIOManifestCacheExpirationIntervalMillis(long
expiration);
-
- public abstract Builder setIOManifestCacheMaxTotalBytes(long bytes);
-
- public abstract Builder setIOManifestCacheMaxContentLength(long length);
-
- public abstract Builder setUri(@Nullable String uri);
-
- public abstract Builder setClientPoolSize(int size);
-
- public abstract Builder setClientPoolEvictionIntervalMs(long interval);
-
- public abstract Builder setClientPoolCacheKeys(@Nullable String keys);
-
- public abstract Builder setLockImplementation(@Nullable String
lockImplementation);
-
- public abstract Builder setLockHeartbeatIntervalMillis(long interval);
-
- public abstract Builder setLockHeartbeatTimeoutMillis(long timeout);
-
- public abstract Builder setLockHeartbeatThreads(int threads);
-
- public abstract Builder setLockAcquireIntervalMillis(long interval);
-
- public abstract Builder setLockAcquireTimeoutMillis(long timeout);
-
- public abstract Builder setAppIdentifier(@Nullable String id);
-
- public abstract Builder setUser(@Nullable String user);
-
- public abstract Builder setAuthSessionTimeoutMillis(long timeout);
-
- public abstract Builder setConfiguration(@Nullable Configuration conf);
+ public abstract Builder setProperties(Properties props);
public abstract IcebergCatalogConfig build();
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
index fb32e18d937..ef535353efd 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
@@ -21,19 +21,21 @@ import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import
org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.Config;
import org.apache.beam.sdk.managed.ManagedTransformConstants;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.iceberg.catalog.TableIdentifier;
/**
@@ -47,7 +49,6 @@ public class IcebergReadSchemaTransformProvider extends
TypedSchemaTransformProv
@Override
protected SchemaTransform from(Config configuration) {
- configuration.validate();
return new IcebergReadSchemaTransform(configuration);
}
@@ -68,21 +69,24 @@ public class IcebergReadSchemaTransformProvider extends
TypedSchemaTransformProv
return new AutoValue_IcebergReadSchemaTransformProvider_Config.Builder();
}
+ @SchemaFieldDescription("Identifier of the Iceberg table to write to.")
public abstract String getTable();
- public abstract IcebergSchemaTransformCatalogConfig getCatalogConfig();
+ @SchemaFieldDescription("Name of the catalog containing the table.")
+ public abstract String getCatalogName();
+
+ @SchemaFieldDescription("Configuration properties used to set up the
Iceberg catalog.")
+ public abstract Map<String, String> getCatalogProperties();
@AutoValue.Builder
public abstract static class Builder {
- public abstract Builder setTable(String tables);
+ public abstract Builder setTable(String table);
- public abstract Builder
setCatalogConfig(IcebergSchemaTransformCatalogConfig catalogConfig);
+ public abstract Builder setCatalogName(String catalogName);
- public abstract Config build();
- }
+ public abstract Builder setCatalogProperties(Map<String, String>
catalogProperties);
- public void validate() {
- getCatalogConfig().validate();
+ public abstract Config build();
}
}
@@ -109,17 +113,13 @@ public class IcebergReadSchemaTransformProvider extends
TypedSchemaTransformProv
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
- IcebergSchemaTransformCatalogConfig catalogConfig =
configuration.getCatalogConfig();
+ Properties properties = new Properties();
+ properties.putAll(configuration.getCatalogProperties());
IcebergCatalogConfig.Builder catalogBuilder =
-
IcebergCatalogConfig.builder().setName(catalogConfig.getCatalogName());
-
- if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) {
- catalogBuilder =
catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType());
- }
- if (!Strings.isNullOrEmpty(catalogConfig.getWarehouseLocation())) {
- catalogBuilder =
catalogBuilder.setWarehouseLocation(catalogConfig.getWarehouseLocation());
- }
+ IcebergCatalogConfig.builder()
+ .setCatalogName(configuration.getCatalogName())
+ .setProperties(properties);
PCollection<Row> output =
input
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformCatalogConfig.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformCatalogConfig.java
deleted file mode 100644
index 87b3d204198..00000000000
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformCatalogConfig.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.iceberg;
-
-import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
-
-import com.google.auto.value.AutoValue;
-import java.util.Set;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
-import org.apache.beam.sdk.schemas.NoSuchSchemaException;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.SchemaRegistry;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
-import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
-import org.apache.beam.sdk.util.Preconditions;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
-import org.apache.iceberg.CatalogUtil;
-import org.checkerframework.checker.nullness.qual.Nullable;
-
-@DefaultSchema(AutoValueSchema.class)
-@AutoValue
-public abstract class IcebergSchemaTransformCatalogConfig {
- public static Builder builder() {
- return new AutoValue_IcebergSchemaTransformCatalogConfig.Builder();
- }
-
- public abstract String getCatalogName();
-
- @SchemaFieldDescription("Valid types are: {hadoop, hive, rest}")
- public abstract @Nullable String getCatalogType();
-
- public abstract @Nullable String getCatalogImplementation();
-
- public abstract @Nullable String getWarehouseLocation();
-
- @AutoValue.Builder
- public abstract static class Builder {
-
- public abstract Builder setCatalogName(String catalogName);
-
- public abstract Builder setCatalogType(String catalogType);
-
- public abstract Builder setCatalogImplementation(String
catalogImplementation);
-
- public abstract Builder setWarehouseLocation(String warehouseLocation);
-
- public abstract IcebergSchemaTransformCatalogConfig build();
- }
-
- public static final Schema SCHEMA;
-
- static {
- try {
- // To stay consistent with our SchemaTransform configuration naming
conventions,
- // we sort lexicographically and convert field names to snake_case
- SCHEMA =
- SchemaRegistry.createDefault()
- .getSchema(IcebergSchemaTransformCatalogConfig.class)
- .sorted()
- .toSnakeCase();
- } catch (NoSuchSchemaException e) {
- throw new RuntimeException(e);
- }
- }
-
- @SuppressWarnings("argument")
- public Row toRow() {
- return Row.withSchema(SCHEMA)
- .withFieldValue("catalog_name", getCatalogName())
- .withFieldValue("catalog_type", getCatalogType())
- .withFieldValue("catalog_implementation", getCatalogImplementation())
- .withFieldValue("warehouse_location", getWarehouseLocation())
- .build();
- }
-
- public static final Set<String> VALID_CATALOG_TYPES =
- Sets.newHashSet(
- CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP,
- CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE,
- CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
-
- public void validate() {
- if (!Strings.isNullOrEmpty(getCatalogType())) {
- checkArgument(
-
VALID_CATALOG_TYPES.contains(Preconditions.checkArgumentNotNull(getCatalogType())),
- "Invalid catalog type. Please pick one of %s",
- VALID_CATALOG_TYPES);
- }
- }
-}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
index b490693a9ad..b3de7a88c54 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
@@ -21,6 +21,8 @@ import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import
org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Config;
import org.apache.beam.sdk.managed.ManagedTransformConstants;
import org.apache.beam.sdk.schemas.AutoValueSchema;
@@ -39,7 +41,6 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.iceberg.catalog.TableIdentifier;
/**
@@ -64,7 +65,6 @@ public class IcebergWriteSchemaTransformProvider extends
TypedSchemaTransformPro
@Override
protected SchemaTransform from(Config configuration) {
- configuration.validate();
return new IcebergWriteSchemaTransform(configuration);
}
@@ -93,20 +93,21 @@ public class IcebergWriteSchemaTransformProvider extends
TypedSchemaTransformPro
@SchemaFieldDescription("Identifier of the Iceberg table to write to.")
public abstract String getTable();
- @SchemaFieldDescription("Configuration parameters used to set up the
Iceberg catalog.")
- public abstract IcebergSchemaTransformCatalogConfig getCatalogConfig();
+ @SchemaFieldDescription("Name of the catalog containing the table.")
+ public abstract String getCatalogName();
+
+ @SchemaFieldDescription("Configuration properties used to set up the
Iceberg catalog.")
+ public abstract Map<String, String> getCatalogProperties();
@AutoValue.Builder
public abstract static class Builder {
- public abstract Builder setTable(String tables);
+ public abstract Builder setTable(String table);
- public abstract Builder
setCatalogConfig(IcebergSchemaTransformCatalogConfig catalogConfig);
+ public abstract Builder setCatalogName(String catalogName);
- public abstract Config build();
- }
+ public abstract Builder setCatalogProperties(Map<String, String>
catalogProperties);
- public void validate() {
- getCatalogConfig().validate();
+ public abstract Config build();
}
}
@@ -133,26 +134,21 @@ public class IcebergWriteSchemaTransformProvider extends
TypedSchemaTransformPro
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
-
PCollection<Row> rows = input.get(INPUT_TAG);
- IcebergSchemaTransformCatalogConfig catalogConfig =
configuration.getCatalogConfig();
-
- IcebergCatalogConfig.Builder catalogBuilder =
-
IcebergCatalogConfig.builder().setName(catalogConfig.getCatalogName());
+ Properties properties = new Properties();
+ properties.putAll(configuration.getCatalogProperties());
- if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) {
- catalogBuilder =
catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType());
- }
- if (!Strings.isNullOrEmpty(catalogConfig.getWarehouseLocation())) {
- catalogBuilder =
catalogBuilder.setWarehouseLocation(catalogConfig.getWarehouseLocation());
- }
+ IcebergCatalogConfig catalog =
+ IcebergCatalogConfig.builder()
+ .setCatalogName(configuration.getCatalogName())
+ .setProperties(properties)
+ .build();
// TODO: support dynamic destinations
IcebergWriteResult result =
rows.apply(
- IcebergIO.writeRows(catalogBuilder.build())
- .to(TableIdentifier.parse(configuration.getTable())));
+
IcebergIO.writeRows(catalog).to(TableIdentifier.parse(configuration.getTable())));
PCollection<Row> snapshots =
result
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
index 467a2cbaf24..0420e2f5779 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
@@ -206,12 +206,12 @@ public class IcebergIOIT implements Serializable {
Map<String, Object> config =
ImmutableMap.<String, Object>builder()
.put("table", tableId.toString())
+ .put("catalog_name", "test-name")
.put(
- "catalog_config",
+ "catalog_properties",
ImmutableMap.<String, String>builder()
- .put("catalog_name", "hadoop")
- .put("catalog_type",
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
- .put("warehouse_location", warehouseLocation)
+ .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+ .put("warehouse", warehouseLocation)
.build())
.build();
@@ -246,12 +246,12 @@ public class IcebergIOIT implements Serializable {
Map<String, Object> config =
ImmutableMap.<String, Object>builder()
.put("table", tableId.toString())
+ .put("catalog_name", "test-name")
.put(
- "catalog_config",
+ "catalog_properties",
ImmutableMap.<String, String>builder()
- .put("catalog_name", "hadoop")
- .put("catalog_type",
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
- .put("warehouse_location", warehouseLocation)
+ .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+ .put("warehouse", warehouseLocation)
.build())
.build();
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
index 12d86811e60..d6db3f68911 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
@@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import java.util.List;
+import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -93,12 +94,12 @@ public class IcebergIOReadTest {
.map(record -> SchemaAndRowConversions.recordToRow(schema, record))
.collect(Collectors.toList());
+ Properties props = new Properties();
+ props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+ props.setProperty("warehouse", warehouse.location);
+
IcebergCatalogConfig catalogConfig =
- IcebergCatalogConfig.builder()
- .setName("hadoop")
- .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
- .setWarehouseLocation(warehouse.location)
- .build();
+
IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build();
PCollection<Row> output =
testPipeline
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
index e04eaf48cb3..e0a584ec9da 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.UUID;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -75,12 +76,12 @@ public class IcebergIOWriteTest implements Serializable {
// Create a table and add records to it.
Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA);
+ Properties props = new Properties();
+ props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+ props.setProperty("warehouse", warehouse.location);
+
IcebergCatalogConfig catalog =
- IcebergCatalogConfig.builder()
- .setName("hadoop")
- .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
- .setWarehouseLocation(warehouse.location)
- .build();
+
IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build();
testPipeline
.apply("Records To Add",
Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1)))
@@ -109,12 +110,12 @@ public class IcebergIOWriteTest implements Serializable {
Table table2 = warehouse.createTable(table2Id, TestFixtures.SCHEMA);
Table table3 = warehouse.createTable(table3Id, TestFixtures.SCHEMA);
+ Properties props = new Properties();
+ props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+ props.setProperty("warehouse", warehouse.location);
+
IcebergCatalogConfig catalog =
- IcebergCatalogConfig.builder()
- .setName("hadoop")
- .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
- .setWarehouseLocation(warehouse.location)
- .build();
+
IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build();
DynamicDestinations dynamicDestinations =
new DynamicDestinations() {
@@ -199,12 +200,12 @@ public class IcebergIOWriteTest implements Serializable {
elementsPerTable.computeIfAbsent(tableId, ignored ->
Lists.newArrayList()).add(element);
}
+ Properties props = new Properties();
+ props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+ props.setProperty("warehouse", warehouse.location);
+
IcebergCatalogConfig catalog =
- IcebergCatalogConfig.builder()
- .setName("hadoop")
- .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
- .setWarehouseLocation(warehouse.location)
- .build();
+
IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build();
DynamicDestinations dynamicDestinations =
new DynamicDestinations() {
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
index 46168a487dd..bc15021fa2b 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
@@ -21,6 +21,7 @@ import static
org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -52,16 +53,15 @@ public class IcebergReadSchemaTransformProviderTest {
@Test
public void testBuildTransformWithRow() {
- Row catalogConfigRow =
- Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA)
- .withFieldValue("catalog_name", "test_name")
- .withFieldValue("catalog_type",
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
- .withFieldValue("warehouse_location", "test_location")
- .build();
+ Map<String, String> properties = new HashMap<>();
+ properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+ properties.put("warehouse", "test_location");
+
Row transformConfigRow =
Row.withSchema(new
IcebergReadSchemaTransformProvider().configurationSchema())
.withFieldValue("table", "test_table_identifier")
- .withFieldValue("catalog_config", catalogConfigRow)
+ .withFieldValue("catalog_name", "test-name")
+ .withFieldValue("catalog_properties", properties)
.build();
new IcebergReadSchemaTransformProvider().from(transformConfigRow);
@@ -97,17 +97,15 @@ public class IcebergReadSchemaTransformProviderTest {
.map(record -> SchemaAndRowConversions.recordToRow(schema, record))
.collect(Collectors.toList());
- IcebergSchemaTransformCatalogConfig catalogConfig =
- IcebergSchemaTransformCatalogConfig.builder()
- .setCatalogName("hadoop")
- .setCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
- .setWarehouseLocation(warehouse.location)
- .build();
+ Map<String, String> properties = new HashMap<>();
+ properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+ properties.put("warehouse", warehouse.location);
IcebergReadSchemaTransformProvider.Config readConfig =
IcebergReadSchemaTransformProvider.Config.builder()
.setTable(identifier)
- .setCatalogConfig(catalogConfig)
+ .setCatalogName("name")
+ .setCatalogProperties(properties)
.build();
PCollection<Row> output =
@@ -158,10 +156,10 @@ public class IcebergReadSchemaTransformProviderTest {
String yamlConfig =
String.format(
"table: %s\n"
- + "catalog_config: \n"
- + " catalog_name: hadoop\n"
- + " catalog_type: %s\n"
- + " warehouse_location: %s",
+ + "catalog_name: test-name\n"
+ + "catalog_properties: \n"
+ + " type: %s\n"
+ + " warehouse: %s",
identifier, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP,
warehouse.location);
Map<String, Object> configMap = new Yaml().load(yamlConfig);
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java
index fb4c98cb0bd..7863f7812a1 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java
@@ -25,7 +25,9 @@ import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import
org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload;
@@ -42,6 +44,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import
org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.TableIdentifier;
import org.junit.ClassRule;
@@ -63,18 +66,19 @@ public class IcebergSchemaTransformTranslationTest {
static final IcebergReadSchemaTransformProvider READ_PROVIDER =
new IcebergReadSchemaTransformProvider();
+ private static final Map<String, String> CATALOG_PROPERTIES =
+ ImmutableMap.<String, String>builder()
+ .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+ .put("warehouse", "test_location")
+ .build();
+
@Test
public void testReCreateWriteTransformFromRow() {
- Row catalogConfigRow =
- Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA)
- .withFieldValue("catalog_name", "test_name")
- .withFieldValue("catalog_type",
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
- .withFieldValue("warehouse_location", "test_location")
- .build();
Row transformConfigRow =
Row.withSchema(WRITE_PROVIDER.configurationSchema())
.withFieldValue("table", "test_table_identifier")
- .withFieldValue("catalog_config", catalogConfigRow)
+ .withFieldValue("catalog_name", "test-name")
+ .withFieldValue("catalog_properties", CATALOG_PROPERTIES)
.build();
IcebergWriteSchemaTransform writeTransform =
(IcebergWriteSchemaTransform) WRITE_PROVIDER.from(transformConfigRow);
@@ -101,17 +105,11 @@ public class IcebergSchemaTransformTranslationTest {
Collections.singletonList(Row.withSchema(inputSchema).addValue("a").build())))
.setRowSchema(inputSchema);
- Row catalogConfigRow =
- Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA)
- .withFieldValue("catalog_name", "test_catalog")
- .withFieldValue("catalog_type",
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
- .withFieldValue("catalog_implementation", "test_implementation")
- .withFieldValue("warehouse_location", warehouse.location)
- .build();
Row transformConfigRow =
Row.withSchema(WRITE_PROVIDER.configurationSchema())
.withFieldValue("table", "test_identifier")
- .withFieldValue("catalog_config", catalogConfigRow)
+ .withFieldValue("catalog_name", "test-name")
+ .withFieldValue("catalog_properties", CATALOG_PROPERTIES)
.build();
IcebergWriteSchemaTransform writeTransform =
@@ -158,16 +156,11 @@ public class IcebergSchemaTransformTranslationTest {
@Test
public void testReCreateReadTransformFromRow() {
// setting a subset of fields here.
- Row catalogConfigRow =
- Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA)
- .withFieldValue("catalog_name", "test_name")
- .withFieldValue("catalog_type",
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
- .withFieldValue("warehouse_location", "test_location")
- .build();
Row transformConfigRow =
Row.withSchema(READ_PROVIDER.configurationSchema())
.withFieldValue("table", "test_table_identifier")
- .withFieldValue("catalog_config", catalogConfigRow)
+ .withFieldValue("catalog_name", "test-name")
+ .withFieldValue("catalog_properties", CATALOG_PROPERTIES)
.build();
IcebergReadSchemaTransform readTransform =
@@ -188,19 +181,17 @@ public class IcebergSchemaTransformTranslationTest {
throws InvalidProtocolBufferException, IOException {
// First build a pipeline
Pipeline p = Pipeline.create();
- Row catalogConfigRow =
- Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA)
- .withFieldValue("catalog_name", "test_catalog")
- .withFieldValue("catalog_type",
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
- .withFieldValue("warehouse_location", warehouse.location)
- .build();
String identifier = "default.table_" +
Long.toString(UUID.randomUUID().hashCode(), 16);
warehouse.createTable(TableIdentifier.parse(identifier),
TestFixtures.SCHEMA);
+ Map<String, String> properties = new HashMap<>(CATALOG_PROPERTIES);
+ properties.put("warehouse", warehouse.location);
+
Row transformConfigRow =
Row.withSchema(READ_PROVIDER.configurationSchema())
.withFieldValue("table", identifier)
- .withFieldValue("catalog_config", catalogConfigRow)
+ .withFieldValue("catalog_name", "test-name")
+ .withFieldValue("catalog_properties", properties)
.build();
IcebergReadSchemaTransform readTransform =
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
index 9ef3e9945ec..75884f4bcf7 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
@@ -23,6 +23,7 @@ import static
org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -61,16 +62,15 @@ public class IcebergWriteSchemaTransformProviderTest {
@Test
public void testBuildTransformWithRow() {
- Row catalogConfigRow =
- Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA)
- .withFieldValue("catalog_name", "test_name")
- .withFieldValue("catalog_type",
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
- .withFieldValue("warehouse_location", "test_location")
- .build();
+ Map<String, String> properties = new HashMap<>();
+ properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+ properties.put("warehouse", "test_location");
+
Row transformConfigRow =
Row.withSchema(new
IcebergWriteSchemaTransformProvider().configurationSchema())
.withFieldValue("table", "test_table_identifier")
- .withFieldValue("catalog_config", catalogConfigRow)
+ .withFieldValue("catalog_name", "test-name")
+ .withFieldValue("catalog_properties", properties)
.build();
new IcebergWriteSchemaTransformProvider().from(transformConfigRow);
@@ -85,15 +85,15 @@ public class IcebergWriteSchemaTransformProviderTest {
// Create a table and add records to it.
Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA);
+ Map<String, String> properties = new HashMap<>();
+ properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+ properties.put("warehouse", warehouse.location);
+
Config config =
Config.builder()
.setTable(identifier)
- .setCatalogConfig(
- IcebergSchemaTransformCatalogConfig.builder()
- .setCatalogName("hadoop")
- .setCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
- .setWarehouseLocation(warehouse.location)
- .build())
+ .setCatalogName("name")
+ .setCatalogProperties(properties)
.build();
PCollectionRowTuple input =
@@ -127,10 +127,10 @@ public class IcebergWriteSchemaTransformProviderTest {
String yamlConfig =
String.format(
"table: %s\n"
- + "catalog_config: \n"
- + " catalog_name: hadoop\n"
- + " catalog_type: %s\n"
- + " warehouse_location: %s",
+ + "catalog_name: test-name\n"
+ + "catalog_properties: \n"
+ + " type: %s\n"
+ + " warehouse: %s",
identifier, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP,
warehouse.location);
Map<String, Object> configMap = new Yaml().load(yamlConfig);
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
index c7d5353428c..143687e3c99 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.iceberg;
import static org.hamcrest.MatcherAssert.assertThat;
import java.util.List;
+import java.util.Properties;
import java.util.UUID;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -64,14 +65,17 @@ public class ScanSourceTest {
PipelineOptions options = PipelineOptionsFactory.create();
+ Properties props = new Properties();
+ props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+ props.setProperty("warehouse", warehouse.location);
+
BoundedSource<Row> source =
new ScanSource(
IcebergScanConfig.builder()
.setCatalogConfig(
IcebergCatalogConfig.builder()
- .setName("hadoop")
-
.setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
- .setWarehouseLocation(warehouse.location)
+ .setCatalogName("name")
+ .setProperties(props)
.build())
.setScanType(IcebergScanConfig.ScanType.TABLE)
.setTableIdentifier(simpleTable.name().replace("hadoop.",
"").split("\\."))
@@ -103,14 +107,17 @@ public class ScanSourceTest {
PipelineOptions options = PipelineOptionsFactory.create();
+ Properties props = new Properties();
+ props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+ props.setProperty("warehouse", warehouse.location);
+
BoundedSource<Row> source =
new ScanSource(
IcebergScanConfig.builder()
.setCatalogConfig(
IcebergCatalogConfig.builder()
- .setName("hadoop")
-
.setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
- .setWarehouseLocation(warehouse.location)
+ .setCatalogName("name")
+ .setProperties(props)
.build())
.setScanType(IcebergScanConfig.ScanType.TABLE)
.setTableIdentifier(simpleTable.name().replace("hadoop.",
"").split("\\."))
@@ -146,14 +153,17 @@ public class ScanSourceTest {
PipelineOptions options = PipelineOptionsFactory.create();
+ Properties props = new Properties();
+ props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+ props.setProperty("warehouse", warehouse.location);
+
BoundedSource<Row> source =
new ScanSource(
IcebergScanConfig.builder()
.setCatalogConfig(
IcebergCatalogConfig.builder()
- .setName("hadoop")
-
.setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
- .setWarehouseLocation(warehouse.location)
+ .setCatalogName("name")
+ .setProperties(props)
.build())
.setScanType(IcebergScanConfig.ScanType.TABLE)
.setTableIdentifier(simpleTable.name().replace("hadoop.",
"").split("\\."))