This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 3088b77 HIVE-25613: Port Iceberg Hive fixes to the iceberg module
(#2721)
3088b77 is described below
commit 3088b77ab1d0bd9fd69addf5f859b3c175e28dba
Author: pvary <[email protected]>
AuthorDate: Fri Oct 22 14:13:18 2021 +0200
HIVE-25613: Port Iceberg Hive fixes to the iceberg module (#2721)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - Hive: Add table-level JVM lock on commits (#2547)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - Core: Move ClientPool and ClientPoolImpl to core (#2491)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - Hive: Improve code style (#2641)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - Style: Delete blank line of CachedClientPool.java
(#2787)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - Hive: Fix some message typos in HiveCatalog: Matastore
=> Metastore (#2950)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - Hive: Fix toString NPE with recommended constructor
(#3021)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - Build: Upgrade to Gradle 7.x (#2826)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - Hive: Ensure tableLevelMutex is unlocked when
uncommitted metadata delete fails (#3264)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - Doc: refactor Hive documentation with catalog loading
examples (#2544)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - Hive: unify catalog experience across engines (#2565)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - Core: Add HadoopConfigurable interface to serialize
custom FileIO (#2678)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - Move Assert.assertTrue(..) instance checks to AssertJ
assertions (#2756)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - Core: Support multiple specs in OutputFileFactory
(#2858)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - MR: Use SerializableTable in IcebergSplit (#2988)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - Fix exception exception message in IcebergInputFormat
(#3153)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - Data: Fix equality deletes with date/time types (#3135)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - Build: Fix ErrorProne UnnecessarilyQualified warnings
(#3262)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - Build: Fix ErrorProne NewHashMapInt warnings (#3260)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - Core: Fail if both Catalog type and catalog-impl are
configured (#3162)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - MR: Support imported data in InputFormat using name
mapping (#3312)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - Hive: Fix Catalog initialization without Configuration
(#3252)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - Hive: Switch to RetryingHMSClient (allows configuration
of retryDelays and retries) (#3099)
* HIVE-25613: Port Iceberg Hive fixes to the iceberg module
Source Iceberg PR - Hive: Fix Catalogs.hiveCatalog method for default
catalogs (#3338)
---
iceberg/iceberg-catalog/pom.xml | 6 +-
.../org/apache/iceberg/hive/CachedClientPool.java | 16 +-
.../java/org/apache/iceberg/hive/HiveCatalog.java | 38 +--
.../org/apache/iceberg/hive/HiveClientPool.java | 32 +--
.../org/apache/iceberg/hive/HiveSchemaUtil.java | 6 +-
.../apache/iceberg/hive/HiveTableOperations.java | 40 ++-
.../org/apache/iceberg/hive/TestHiveCatalog.java | 30 +++
.../apache/iceberg/hive/TestHiveClientPool.java | 98 ++++++++
.../apache/iceberg/hive/TestHiveCommitLocks.java | 48 +++-
iceberg/iceberg-handler/pom.xml | 13 +-
.../main/java/org/apache/iceberg/mr/Catalogs.java | 89 ++++---
.../org/apache/iceberg/mr/InputFormatConfig.java | 33 +++
.../org/apache/iceberg/mr/hive/Deserializer.java | 3 +-
.../iceberg/mr/hive/HiveIcebergInputFormat.java | 2 +-
.../iceberg/mr/hive/HiveIcebergOutputFormat.java | 18 +-
.../apache/iceberg/mr/hive/HiveIcebergSerDe.java | 4 +-
.../iceberg/mr/mapreduce/IcebergInputFormat.java | 26 +-
.../apache/iceberg/mr/mapreduce/IcebergSplit.java | 51 ++--
.../java/org/apache/iceberg/mr/TestCatalogs.java | 189 +++++++++-----
.../apache/iceberg/mr/TestIcebergInputFormats.java | 9 +-
.../iceberg/mr/TestInputFormatReaderDeletes.java | 2 +
.../iceberg/mr/hive/TestHiveIcebergCTAS.java | 11 +-
.../iceberg/mr/hive/TestHiveIcebergInserts.java | 6 +-
.../iceberg/mr/hive/TestHiveIcebergMigration.java | 28 ++-
.../mr/hive/TestHiveIcebergOutputCommitter.java | 16 +-
.../iceberg/mr/hive/TestHiveIcebergSelects.java | 7 +-
.../iceberg/mr/hive/TestHiveIcebergStatistics.java | 12 +-
.../TestHiveIcebergStorageHandlerLocalScan.java | 3 +-
.../hive/TestHiveIcebergStorageHandlerNoScan.java | 32 +--
...eIcebergStorageHandlerWithMultipleCatalogs.java | 8 +-
.../org/apache/iceberg/mr/hive/TestTables.java | 30 ++-
iceberg/patched-iceberg-core/pom.xml | 18 +-
.../main/java/org/apache/iceberg/CatalogUtil.java | 276 +++++++++++++++++++++
.../main/java/org/apache/iceberg}/ClientPool.java | 4 +-
.../java/org/apache/iceberg}/ClientPoolImpl.java | 17 +-
.../org/apache/iceberg/jdbc/JdbcClientPool.java | 71 ++++++
.../java/org/apache/iceberg/jdbc/JdbcUtil.java | 102 ++++++++
iceberg/pom.xml | 13 +
38 files changed, 1109 insertions(+), 298 deletions(-)
diff --git a/iceberg/iceberg-catalog/pom.xml b/iceberg/iceberg-catalog/pom.xml
index a3af94d..a425eec 100644
--- a/iceberg/iceberg-catalog/pom.xml
+++ b/iceberg/iceberg-catalog/pom.xml
@@ -64,6 +64,10 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
-
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/CachedClientPool.java
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/CachedClientPool.java
index eca505c..e2dc990 100644
---
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/CachedClientPool.java
+++
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/CachedClientPool.java
@@ -25,13 +25,14 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.ClientPool;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.thrift.TException;
-public class CachedClientPool implements ClientPool<HiveMetaStoreClient,
TException> {
+public class CachedClientPool implements ClientPool<IMetaStoreClient,
TException> {
private static Cache<String, HiveClientPool> clientPoolCache;
@@ -40,7 +41,7 @@ public class CachedClientPool implements
ClientPool<HiveMetaStoreClient, TExcept
private final int clientPoolSize;
private final long evictionInterval;
- CachedClientPool(Configuration conf, Map<String, String> properties) {
+ public CachedClientPool(Configuration conf, Map<String, String> properties) {
this.conf = conf;
this.metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
this.clientPoolSize = PropertyUtil.propertyAsInt(properties,
@@ -57,7 +58,6 @@ public class CachedClientPool implements
ClientPool<HiveMetaStoreClient, TExcept
return clientPoolCache.get(metastoreUri, k -> new
HiveClientPool(clientPoolSize, conf));
}
-
private synchronized void init() {
if (clientPoolCache == null) {
clientPoolCache =
Caffeine.newBuilder().expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS)
@@ -72,7 +72,13 @@ public class CachedClientPool implements
ClientPool<HiveMetaStoreClient, TExcept
}
@Override
- public <R> R run(Action<R, HiveMetaStoreClient, TException> action) throws
TException, InterruptedException {
+ public <R> R run(Action<R, IMetaStoreClient, TException> action) throws
TException, InterruptedException {
return clientPool().run(action);
}
+
+ @Override
+ public <R> R run(Action<R, IMetaStoreClient, TException> action, boolean
retry)
+ throws TException, InterruptedException {
+ return clientPool().run(action, retry);
+ }
}
diff --git
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index 477e591..880d60d 100644
---
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
@@ -38,6 +38,7 @@ import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.ClientPool;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Namespace;
@@ -63,7 +64,7 @@ public class HiveCatalog extends BaseMetastoreCatalog
implements SupportsNamespa
private String name;
private Configuration conf;
private FileIO fileIO;
- private ClientPool<HiveMetaStoreClient, TException> clients;
+ private ClientPool<IMetaStoreClient, TException> clients;
public HiveCatalog() {
}
@@ -71,9 +72,9 @@ public class HiveCatalog extends BaseMetastoreCatalog
implements SupportsNamespa
/**
* Hive Catalog constructor.
*
+ * @param conf Hadoop Configuration
* @deprecated please use the no-arg constructor, setConf and initialize to
construct the catalog. Will be removed in
* v0.13.0
- * @param conf Hadoop Configuration
*/
@Deprecated
public HiveCatalog(Configuration conf) {
@@ -94,6 +95,11 @@ public class HiveCatalog extends BaseMetastoreCatalog
implements SupportsNamespa
@Override
public void initialize(String inputName, Map<String, String> properties) {
this.name = inputName;
+ if (conf == null) {
+ LOG.warn("No Hadoop Configuration was set, using the default environment
Configuration");
+ this.conf = new Configuration();
+ }
+
if (properties.containsKey(CatalogProperties.URI)) {
this.conf.set(HiveConf.ConfVars.METASTOREURIS.varname,
properties.get(CatalogProperties.URI));
}
@@ -235,7 +241,7 @@ public class HiveCatalog extends BaseMetastoreCatalog
implements SupportsNamespa
!namespace.isEmpty(),
"Cannot create namespace with invalid name: %s", namespace);
Preconditions.checkArgument(isValidateNamespace(namespace),
- "Cannot support multi part namespace in Hive MetaStore: %s",
namespace);
+ "Cannot support multi part namespace in Hive Metastore: %s",
namespace);
try {
clients.run(client -> {
@@ -250,12 +256,12 @@ public class HiveCatalog extends BaseMetastoreCatalog
implements SupportsNamespa
namespace);
} catch (TException e) {
- throw new RuntimeException("Failed to create namespace " + namespace + "
in Hive MataStore", e);
+ throw new RuntimeException("Failed to create namespace " + namespace + "
in Hive Matastore", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(
- "Interrupted in call to createDatabase(name) " + namespace + " in
Hive MataStore", e);
+ "Interrupted in call to createDatabase(name) " + namespace + " in
Hive Matastore", e);
}
}
@@ -268,7 +274,7 @@ public class HiveCatalog extends BaseMetastoreCatalog
implements SupportsNamespa
return ImmutableList.of();
}
try {
- List<Namespace> namespaces =
clients.run(HiveMetaStoreClient::getAllDatabases)
+ List<Namespace> namespaces =
clients.run(IMetaStoreClient::getAllDatabases)
.stream()
.map(Namespace::of)
.collect(Collectors.toList());
@@ -277,12 +283,12 @@ public class HiveCatalog extends BaseMetastoreCatalog
implements SupportsNamespa
return namespaces;
} catch (TException e) {
- throw new RuntimeException("Failed to list all namespace: " + namespace
+ " in Hive MataStore", e);
+ throw new RuntimeException("Failed to list all namespace: " + namespace
+ " in Hive Matastore", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(
- "Interrupted in call to getAllDatabases() " + namespace + " in Hive
MataStore", e);
+ "Interrupted in call to getAllDatabases() " + namespace + " in Hive
Matastore", e);
}
}
@@ -311,12 +317,12 @@ public class HiveCatalog extends BaseMetastoreCatalog
implements SupportsNamespa
return false;
} catch (TException e) {
- throw new RuntimeException("Failed to drop namespace " + namespace + "
in Hive MataStore", e);
+ throw new RuntimeException("Failed to drop namespace " + namespace + "
in Hive Matastore", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(
- "Interrupted in call to drop dropDatabase(name) " + namespace + " in
Hive MataStore", e);
+ "Interrupted in call to drop dropDatabase(name) " + namespace + " in
Hive Matastore", e);
}
}
@@ -362,11 +368,11 @@ public class HiveCatalog extends BaseMetastoreCatalog
implements SupportsNamespa
} catch (TException e) {
throw new RuntimeException(
- "Failed to list namespace under namespace: " + namespace + " in Hive
MataStore", e);
+ "Failed to list namespace under namespace: " + namespace + " in Hive
Matastore", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new RuntimeException("Interrupted in call to getDatabase(name) " +
namespace + " in Hive MataStore", e);
+ throw new RuntimeException("Interrupted in call to getDatabase(name) " +
namespace + " in Hive Matastore", e);
}
}
@@ -386,12 +392,12 @@ public class HiveCatalog extends BaseMetastoreCatalog
implements SupportsNamespa
throw new NoSuchNamespaceException(e, "Namespace does not exist: %s",
namespace);
} catch (TException e) {
- throw new RuntimeException("Failed to list namespace under namespace: "
+ namespace + " in Hive MataStore", e);
+ throw new RuntimeException("Failed to list namespace under namespace: "
+ namespace + " in Hive Matastore", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(
- "Interrupted in call to getDatabase(name) " + namespace + " in Hive
MataStore", e);
+ "Interrupted in call to getDatabase(name) " + namespace + " in Hive
Matastore", e);
}
}
@@ -514,7 +520,7 @@ public class HiveCatalog extends BaseMetastoreCatalog
implements SupportsNamespa
public String toString() {
return MoreObjects.toStringHelper(this)
.add("name", name)
- .add("uri", this.conf.get(HiveConf.ConfVars.METASTOREURIS.varname))
+ .add("uri", this.conf == null ? "" :
this.conf.get(HiveConf.ConfVars.METASTOREURIS.varname))
.toString();
}
diff --git
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
index ef31e3b..e322792 100644
---
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
+++
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
@@ -21,35 +21,39 @@ package org.apache.iceberg.hive;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.ClientPoolImpl;
+import org.apache.iceberg.common.DynMethods;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
-public class HiveClientPool extends ClientPoolImpl<HiveMetaStoreClient,
TException> {
+public class HiveClientPool extends ClientPoolImpl<IMetaStoreClient,
TException> {
- // use appropriate ctor depending on whether we're working with Hive2 or
Hive3 dependencies
- // we need to do this because there is a breaking API change between Hive2
and Hive3
- private static final DynConstructors.Ctor<HiveMetaStoreClient> CLIENT_CTOR =
DynConstructors.builder()
- .impl(HiveMetaStoreClient.class, HiveConf.class)
- .impl(HiveMetaStoreClient.class, Configuration.class)
- .build();
+ // use appropriate ctor depending on whether we're working with Hive1,
Hive2, or Hive3 dependencies
+ // we need to do this because there is a breaking API change between Hive1,
Hive2, and Hive3
+ private static final DynMethods.StaticMethod GET_CLIENT =
DynMethods.builder("getProxy")
+ .impl(RetryingMetaStoreClient.class, HiveConf.class)
+ .impl(RetryingMetaStoreClient.class, HiveConf.class, Boolean.TYPE)
+ .impl(RetryingMetaStoreClient.class, Configuration.class, Boolean.TYPE)
+ .buildStatic();
private final HiveConf hiveConf;
public HiveClientPool(int poolSize, Configuration conf) {
- super(poolSize, TTransportException.class);
+ // Do not allow retry by default as we rely on RetryingHiveClient
+ super(poolSize, TTransportException.class, false);
this.hiveConf = new HiveConf(conf, HiveClientPool.class);
this.hiveConf.addResource(conf);
}
@Override
- protected HiveMetaStoreClient newClient() {
+ protected IMetaStoreClient newClient() {
try {
try {
- return CLIENT_CTOR.newInstance(hiveConf);
+ return GET_CLIENT.invoke(hiveConf, true);
} catch (RuntimeException e) {
// any MetaException would be wrapped into RuntimeException during
reflection, so let's double-check type here
if (e.getCause() instanceof MetaException) {
@@ -71,7 +75,7 @@ public class HiveClientPool extends
ClientPoolImpl<HiveMetaStoreClient, TExcepti
}
@Override
- protected HiveMetaStoreClient reconnect(HiveMetaStoreClient client) {
+ protected IMetaStoreClient reconnect(IMetaStoreClient client) {
try {
client.close();
client.reconnect();
@@ -88,7 +92,7 @@ public class HiveClientPool extends
ClientPoolImpl<HiveMetaStoreClient, TExcepti
}
@Override
- protected void close(HiveMetaStoreClient client) {
+ protected void close(IMetaStoreClient client) {
client.close();
}
diff --git
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
index e6c6cf5..57123e1 100644
---
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
+++
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
@@ -262,6 +262,8 @@ public final class HiveSchemaUtil {
case DATE:
return "date";
case TIME:
+ case STRING:
+ case UUID:
return "string";
case TIMESTAMP:
Types.TimestampType timestampType = (Types.TimestampType) type;
@@ -269,11 +271,7 @@ public final class HiveSchemaUtil {
return "timestamp with local time zone";
}
return "timestamp";
- case STRING:
- case UUID:
- return "string";
case FIXED:
- return "binary";
case BINARY:
return "binary";
case DECIMAL:
diff --git
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 386d9ff..0ade33f 100644
---
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -19,6 +19,8 @@
package org.apache.iceberg.hive;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
@@ -28,12 +30,14 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.LockComponent;
@@ -48,6 +52,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.ClientPool;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.TableMetadata;
@@ -82,14 +87,18 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS =
"iceberg.hive.lock-timeout-ms";
private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS =
"iceberg.hive.lock-check-min-wait-ms";
private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS =
"iceberg.hive.lock-check-max-wait-ms";
+ private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS =
"iceberg.hive.table-level-lock-evict-ms";
private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 *
1000; // 3 minutes
private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50
milliseconds
private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; //
5 seconds
+ private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT =
TimeUnit.MINUTES.toMillis(10);
private static final DynMethods.UnboundMethod ALTER_TABLE =
DynMethods.builder("alter_table")
- .impl(HiveMetaStoreClient.class, "alter_table_with_environmentContext",
+ .impl(IMetaStoreClient.class, "alter_table_with_environmentContext",
String.class, String.class, Table.class, EnvironmentContext.class)
- .impl(HiveMetaStoreClient.class, "alter_table",
+ .impl(IMetaStoreClient.class, "alter_table",
String.class, String.class, Table.class, EnvironmentContext.class)
+ .impl(IMetaStoreClient.class, "alter_table",
+ String.class, String.class, Table.class)
.build();
private static final BiMap<String, String> ICEBERG_TO_HMS_TRANSLATION =
ImmutableBiMap.of(
// gc.enabled in Iceberg and external.table.purge in Hive are meant to
do the same things but with different names
@@ -99,6 +108,16 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
// Should be in org.apache.iceberg.hadoop.ConfigProperties, but that is not
ported to Hive codebase
public static final String KEEP_HIVE_STATS = "iceberg.hive.keep.stats";
+ private static Cache<String, ReentrantLock> commitLockCache;
+
+ private static synchronized void initTableLevelLockCache(long
evictionTimeout) {
+ if (commitLockCache == null) {
+ commitLockCache = Caffeine.newBuilder()
+ .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS)
+ .build();
+ }
+ }
+
/**
* Provides key translation where necessary between Iceberg and HMS props.
This translation is needed because some
* properties control the same behaviour but are named differently in
Iceberg and Hive. Therefore changes to these
@@ -130,7 +149,7 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
private final long lockCheckMinWaitTime;
private final long lockCheckMaxWaitTime;
private final FileIO fileIO;
- private final ClientPool<HiveMetaStoreClient, TException> metaClients;
+ private final ClientPool<IMetaStoreClient, TException> metaClients;
protected HiveTableOperations(Configuration conf, ClientPool metaClients,
FileIO fileIO,
String catalogName, String database, String
table) {
@@ -146,6 +165,9 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS,
HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT);
this.lockCheckMaxWaitTime =
conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS,
HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT);
+ long tableLevelLockCacheEvictionTimeout =
+ conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS,
HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT);
+ initTableLevelLockCache(tableLevelLockCacheEvictionTimeout);
}
@Override
@@ -195,6 +217,10 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
CommitStatus commitStatus = CommitStatus.FAILURE;
boolean updateHiveTable = false;
Optional<Long> lockId = Optional.empty();
+ // getting a process-level lock per table to avoid concurrent commit
attempts to the same table from the same
+ // JVM process, which would result in unnecessary and costly HMS lock
acquisition requests
+ ReentrantLock tableLevelMutex = commitLockCache.get(fullName, t -> new
ReentrantLock(true));
+ tableLevelMutex.lock();
try {
lockId = Optional.of(acquireLock());
// TODO add lock heart beating for cases where default lock timeout is
too low.
@@ -275,7 +301,7 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
throw new RuntimeException("Interrupted during commit", e);
} finally {
- cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId);
+ cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId,
tableLevelMutex);
}
}
@@ -457,7 +483,8 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
return lockId;
}
- private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String
metadataLocation, Optional<Long> lockId) {
+ private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String
metadataLocation, Optional<Long> lockId,
+ ReentrantLock tableLevelMutex) {
try {
if (commitStatus == CommitStatus.FAILURE) {
// If we are sure the commit failed, clean up the uncommitted metadata
file
@@ -468,6 +495,7 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
throw e;
} finally {
unlock(lockId);
+ tableLevelMutex.unlock();
}
}
diff --git
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
index d4741a1..9518540 100644
---
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
+++
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
@@ -38,6 +38,7 @@ import
org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Types;
@@ -45,6 +46,7 @@ import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
import org.junit.rules.TemporaryFolder;
import static org.apache.iceberg.NullOrder.NULLS_FIRST;
@@ -119,6 +121,34 @@ public class TestHiveCatalog extends HiveMetastoreTest {
}
@Test
+ public void testInitialize() {
+ Assertions.assertDoesNotThrow(() -> {
+ HiveCatalog catalog = new HiveCatalog();
+ catalog.initialize("hive", Maps.newHashMap());
+ });
+ }
+
+ @Test
+ public void testToStringWithoutSetConf() {
+ Assertions.assertDoesNotThrow(() -> {
+ HiveCatalog catalog = new HiveCatalog();
+ catalog.toString();
+ });
+ }
+
+ @Test
+ public void testInitializeCatalogWithProperties() {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("uri", "thrift://examplehost:9083");
+ properties.put("warehouse", "/user/hive/testwarehouse");
+ HiveCatalog catalog = new HiveCatalog();
+ catalog.initialize("hive", properties);
+
+ Assert.assertEquals(catalog.getConf().get("hive.metastore.uris"),
"thrift://examplehost:9083");
+ Assert.assertEquals(catalog.getConf().get("hive.metastore.warehouse.dir"),
"/user/hive/testwarehouse");
+ }
+
+ @Test
public void testCreateTableTxnBuilder() throws Exception {
Schema schema = new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID"),
diff --git
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveClientPool.java
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveClientPool.java
index b7870af..36996e3 100644
---
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveClientPool.java
+++
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveClientPool.java
@@ -23,9 +23,23 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.FunctionType;
+import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
public class TestHiveClientPool {
@@ -38,6 +52,20 @@ public class TestHiveClientPool {
" </property>\n" +
"</configuration>\n";
+ HiveClientPool clients;
+
+ @Before
+ public void before() {
+ HiveClientPool clientPool = new HiveClientPool(2, new Configuration());
+ clients = Mockito.spy(clientPool);
+ }
+
+ @After
+ public void after() {
+ clients.close();
+ clients = null;
+ }
+
@Test
public void testConf() {
HiveConf conf = createHiveConf();
@@ -65,4 +93,74 @@ public class TestHiveClientPool {
}
return hiveConf;
}
+
+ @Test
+ public void testNewClientFailure() {
+ Mockito.doThrow(new RuntimeException("Connection
exception")).when(clients).newClient();
+ AssertHelpers.assertThrows("Should throw exception",
RuntimeException.class,
+ "Connection exception", () -> clients.run(Object::toString));
+ }
+
+ @Test
+ public void testGetTablesFailsForNonReconnectableException() throws
Exception {
+ HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
+ Mockito.doReturn(hmsClient).when(clients).newClient();
+ Mockito.doThrow(new MetaException("Another meta exception"))
+ .when(hmsClient).getTables(Mockito.anyString(),
Mockito.anyString());
+ AssertHelpers.assertThrows("Should throw exception", MetaException.class,
+ "Another meta exception", () -> clients.run(client ->
client.getTables("default", "t")));
+ }
+
+ @Test
+ public void testConnectionFailureRestoreForMetaException() throws Exception {
+ HiveMetaStoreClient hmsClient = newClient();
+
+ // Throwing an exception may trigger the client to reconnect.
+ String metaMessage = "Got exception:
org.apache.thrift.transport.TTransportException";
+ Mockito.doThrow(new
MetaException(metaMessage)).when(hmsClient).getAllDatabases();
+
+ // Create a new client when the reconnect method is called.
+ HiveMetaStoreClient newClient = reconnect(hmsClient);
+
+ List<String> databases = Lists.newArrayList("db1", "db2");
+
+ Mockito.doReturn(databases).when(newClient).getAllDatabases();
+ // The return is OK when the reconnect method is called.
+ Assert.assertEquals(databases, clients.run(client ->
client.getAllDatabases(), true));
+
+ // Verify that the method is called.
+ Mockito.verify(clients).reconnect(hmsClient);
+ Mockito.verify(clients, Mockito.never()).reconnect(newClient);
+ }
+
+ @Test
+ public void testConnectionFailureRestoreForTTransportException() throws
Exception {
+ HiveMetaStoreClient hmsClient = newClient();
+ Mockito.doThrow(new
TTransportException()).when(hmsClient).getAllFunctions();
+
+ // Create a new client when getAllFunctions() failed.
+ HiveMetaStoreClient newClient = reconnect(hmsClient);
+
+ GetAllFunctionsResponse response = new GetAllFunctionsResponse();
+ response.addToFunctions(
+ new Function("concat", "db1", "classname", "root",
PrincipalType.USER, 100, FunctionType.JAVA, null));
+ Mockito.doReturn(response).when(newClient).getAllFunctions();
+
+ Assert.assertEquals(response, clients.run(client ->
client.getAllFunctions(), true));
+
+ Mockito.verify(clients).reconnect(hmsClient);
+ Mockito.verify(clients, Mockito.never()).reconnect(newClient);
+ }
+
+ private HiveMetaStoreClient newClient() {
+ HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
+ Mockito.doReturn(hmsClient).when(clients).newClient();
+ return hmsClient;
+ }
+
+ private HiveMetaStoreClient reconnect(HiveMetaStoreClient obsoleteClient) {
+ HiveMetaStoreClient newClient = Mockito.mock(HiveMetaStoreClient.class);
+ Mockito.doReturn(newClient).when(clients).reconnect(obsoleteClient);
+ return newClient;
+ }
}
diff --git
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
index 531e511..3b4bb15 100644
---
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
+++
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
@@ -20,9 +20,15 @@
package org.apache.iceberg.hive;
import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.iceberg.AssertHelpers;
@@ -42,7 +48,11 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestHiveCommitLocks extends HiveTableBaseTest {
@@ -50,8 +60,8 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
private static HiveClientPool spyClientPool = null;
private static CachedClientPool spyCachedClientPool = null;
private static Configuration overriddenHiveConf = new
Configuration(hiveConf);
- private static AtomicReference<HiveMetaStoreClient> spyClientRef = new
AtomicReference<>();
- private static HiveMetaStoreClient spyClient = null;
+ private static AtomicReference<IMetaStoreClient> spyClientRef = new
AtomicReference<>();
+ private static IMetaStoreClient spyClient = null;
HiveTableOperations ops = null;
TableMetadata metadataV1 = null;
TableMetadata metadataV2 = null;
@@ -71,12 +81,13 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
// The spy clients are reused between methods and closed at the end of all
tests in this class.
spyClientPool = spy(new HiveClientPool(1, overriddenHiveConf));
when(spyClientPool.newClient()).thenAnswer(invocation -> {
- HiveMetaStoreClient client = (HiveMetaStoreClient)
invocation.callRealMethod();
- spyClientRef.set(spy(client));
+ // cannot spy on RetryingHiveMetastoreClient as it is a proxy
+ IMetaStoreClient client = spy(new HiveMetaStoreClient(hiveConf));
+ spyClientRef.set(client);
return spyClientRef.get();
});
- spyClientPool.run(HiveMetaStoreClient::isLocalMetaStore); // To ensure new
client is created.
+ spyClientPool.run(IMetaStoreClient::isLocalMetaStore); // To ensure new
client is created.
spyCachedClientPool = spy(new CachedClientPool(hiveConf,
Collections.emptyMap()));
when(spyCachedClientPool.clientPool()).thenAnswer(invocation ->
spyClientPool);
@@ -210,4 +221,31 @@ public class TestHiveCommitLocks extends HiveTableBaseTest
{
"Could not acquire the lock on",
() -> spyOps.doCommit(metadataV2, metadataV1));
}
+
+ @Test
+ public void
testTableLevelProcessLockBlocksConcurrentHMSRequestsForSameTable() throws
Exception {
+ int numConcurrentCommits = 10;
+ // resetting the spy client to forget about prior call history
+ reset(spyClient);
+
+ // simulate several concurrent commit operations on the same table
+ ExecutorService executor =
Executors.newFixedThreadPool(numConcurrentCommits);
+ IntStream.range(0, numConcurrentCommits).forEach(i ->
+ executor.submit(() -> {
+ try {
+ spyOps.doCommit(metadataV2, metadataV1);
+ } catch (CommitFailedException e) {
+ // failures are expected here when checking the base version
+ // it's no problem, we're not testing the actual commit success
here, only the HMS lock acquisition attempts
+ }
+ }));
+ executor.shutdown();
+ executor.awaitTermination(30, TimeUnit.SECONDS);
+
+ // intra-process commits to the same table should be serialized now
+ // i.e. no thread should receive WAITING state from HMS and have to call
checkLock periodically
+ verify(spyClient, never()).checkLock(any(Long.class));
+ // all threads eventually got their turn
+ verify(spyClient,
times(numConcurrentCommits)).lock(any(LockRequest.class));
+ }
}
diff --git a/iceberg/iceberg-handler/pom.xml b/iceberg/iceberg-handler/pom.xml
index c60e0ef..87c21fa 100644
--- a/iceberg/iceberg-handler/pom.xml
+++ b/iceberg/iceberg-handler/pom.xml
@@ -97,10 +97,15 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <scope>test</scope>
- </dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
index 710150a..ccd3c6e 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
@@ -19,7 +19,6 @@
package org.apache.iceberg.mr;
-import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -35,35 +34,46 @@ import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.hadoop.HadoopTables;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
/**
* Class for catalog resolution and accessing the common functions for {@link
Catalog} API.
* <p>
- * Catalog resolution happens in this order:
- * <ol>
- * <li>Custom catalog if specified by {@link
InputFormatConfig#CATALOG_LOADER_CLASS}
- * <li>Hadoop or Hive catalog if specified by {@link InputFormatConfig#CATALOG}
- * <li>Hadoop Tables
- * </ol>
+ * If the catalog name is provided, get the catalog type from
iceberg.catalog.<code>catalogName</code>.type config.
+ * <p>
+ * In case the catalog name is {@link #ICEBERG_HADOOP_TABLE_NAME
location_based_table},
+ * type is ignored and tables will be loaded using {@link HadoopTables}.
+ * <p>
+ * In case the value of catalog type is null,
iceberg.catalog.<code>catalogName</code>.catalog-impl config
+ * is used to determine the catalog implementation class.
+ * <p>
+ * If catalog name is null, get the catalog type from {@link
InputFormatConfig#CATALOG iceberg.mr.catalog} config:
+ * <ul>
+ * <li>hive: HiveCatalog</li>
+ * <li>location: HadoopTables</li>
+ * <li>hadoop: HadoopCatalog</li>
+ * </ul>
+ * <p>
+ * In case the value of catalog type is null,
+ * {@link InputFormatConfig#CATALOG_LOADER_CLASS
iceberg.mr.catalog.loader.class} is used to determine
+ * the catalog implementation class.
+ * <p>
+ * Note: null catalog name mode is only supported for backwards compatibility.
Using this mode is NOT RECOMMENDED.
*/
public final class Catalogs {
public static final String ICEBERG_DEFAULT_CATALOG_NAME = "default_iceberg";
public static final String ICEBERG_HADOOP_TABLE_NAME =
"location_based_table";
- private static final String HIVE_CATALOG_TYPE = "hive";
- private static final String HADOOP_CATALOG_TYPE = "hadoop";
- private static final String NO_CATALOG_TYPE = "no catalog";
-
public static final String NAME = "name";
public static final String LOCATION = "location";
+ private static final String NO_CATALOG_TYPE = "no catalog";
private static final Set<String> PROPERTIES_TO_REMOVE =
ImmutableSet.of(InputFormatConfig.TABLE_SCHEMA,
InputFormatConfig.PARTITION_SPEC, LOCATION, NAME,
InputFormatConfig.CATALOG_NAME);
@@ -141,7 +151,7 @@ public final class Catalogs {
String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME);
// Create a table property map without the controlling properties
- Map<String, String> map = new HashMap<>(props.size());
+ Map<String, String> map = Maps.newHashMapWithExpectedSize(props.size());
for (Object key : props.keySet()) {
if (!PROPERTIES_TO_REMOVE.contains(key)) {
map.put(key.toString(), props.get(key).toString());
@@ -193,16 +203,20 @@ public final class Catalogs {
*/
public static boolean hiveCatalog(Configuration conf, Properties props) {
String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME);
- return
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equalsIgnoreCase(getCatalogType(conf,
catalogName));
+ String catalogType = getCatalogType(conf, catalogName);
+ if (catalogType != null) {
+ return
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equalsIgnoreCase(catalogType);
+ }
+ catalogType = getCatalogType(conf, ICEBERG_DEFAULT_CATALOG_NAME);
+ if (catalogType != null) {
+ return
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equalsIgnoreCase(catalogType);
+ }
+ return getCatalogProperties(conf, catalogName,
catalogType).get(CatalogProperties.CATALOG_IMPL) == null;
}
@VisibleForTesting
static Optional<Catalog> loadCatalog(Configuration conf, String catalogName)
{
String catalogType = getCatalogType(conf, catalogName);
- if (catalogType == null) {
- throw new NoSuchNamespaceException("Catalog definition for %s is not
found.", catalogName);
- }
-
if (NO_CATALOG_TYPE.equalsIgnoreCase(catalogType)) {
return Optional.empty();
} else {
@@ -238,10 +252,18 @@ public final class Catalogs {
*/
private static Map<String, String>
addCatalogPropertiesIfMissing(Configuration conf, String catalogType,
Map<String,
String> catalogProperties) {
- catalogProperties.putIfAbsent(CatalogUtil.ICEBERG_CATALOG_TYPE,
catalogType);
- if (catalogType.equalsIgnoreCase(HADOOP_CATALOG_TYPE)) {
- catalogProperties.putIfAbsent(CatalogProperties.WAREHOUSE_LOCATION,
- conf.get(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION));
+ if (catalogType != null) {
+ catalogProperties.putIfAbsent(CatalogUtil.ICEBERG_CATALOG_TYPE,
catalogType);
+ }
+
+ String legacyCatalogImpl =
conf.get(InputFormatConfig.CATALOG_LOADER_CLASS);
+ if (legacyCatalogImpl != null) {
+ catalogProperties.putIfAbsent(CatalogProperties.CATALOG_IMPL,
legacyCatalogImpl);
+ }
+
+ String legacyWarehouseLocation =
conf.get(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION);
+ if (legacyWarehouseLocation != null) {
+ catalogProperties.putIfAbsent(CatalogProperties.WAREHOUSE_LOCATION,
legacyWarehouseLocation);
}
return catalogProperties;
}
@@ -249,35 +271,24 @@ public final class Catalogs {
/**
* Return the catalog type based on the catalog name.
* <p>
- * If the catalog name is provided get the catalog type from
'iceberg.catalog.<code>catalogName</code>.type' config.
- * In case the value of this property is null, return with no catalog
definition (Hadoop Table)
- * </p>
- * <p>
- * If catalog name is null, check the global conf for 'iceberg.mr.catalog'
property. If the value of the property is:
- * <ul>
- * <li>null/hive -> Hive Catalog</li>
- * <li>location -> Hadoop Table</li>
- * <li>hadoop -> Hadoop Catalog</li>
- * <li>any other value -> Custom Catalog</li>
- * </ul>
- * </p>
+ * See {@link Catalogs} documentation for catalog type resolution strategy.
+ *
* @param conf global hive configuration
* @param catalogName name of the catalog
* @return type of the catalog, can be null
*/
private static String getCatalogType(Configuration conf, String catalogName)
{
if (catalogName != null) {
- String catalogType =
conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName));
- if (catalogName.equals(ICEBERG_HADOOP_TABLE_NAME) || catalogType ==
null) {
+ String catalogType = conf.get(InputFormatConfig.catalogPropertyConfigKey(
+ catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE));
+ if (catalogName.equals(ICEBERG_HADOOP_TABLE_NAME)) {
return NO_CATALOG_TYPE;
} else {
return catalogType;
}
} else {
String catalogType = conf.get(InputFormatConfig.CATALOG);
- if (catalogType == null) {
- return HIVE_CATALOG_TYPE;
- } else if (catalogType.equals(LOCATION)) {
+ if (catalogType != null && catalogType.equals(LOCATION)) {
return NO_CATALOG_TYPE;
} else {
return catalogType;
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
index e8e2163..32b777b 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
@@ -51,9 +51,30 @@ public class InputFormatConfig {
public static final String SERIALIZED_TABLE_PREFIX =
"iceberg.mr.serialized.table.";
public static final String TABLE_CATALOG_PREFIX =
"iceberg.mr.table.catalog.";
public static final String LOCALITY = "iceberg.mr.locality";
+
+ /**
+ * @deprecated please use {@link #catalogPropertyConfigKey(String, String)}
+ * with config key {@link
org.apache.iceberg.CatalogUtil#ICEBERG_CATALOG_TYPE} to specify the type of a
catalog.
+ */
+ @Deprecated
public static final String CATALOG = "iceberg.mr.catalog";
+
+ /**
+ * @deprecated please use {@link #catalogPropertyConfigKey(String, String)}
+ * with config key {@link
org.apache.iceberg.CatalogProperties#WAREHOUSE_LOCATION}
+ * to specify the warehouse location of a catalog.
+ */
+ @Deprecated
public static final String HADOOP_CATALOG_WAREHOUSE_LOCATION =
"iceberg.mr.catalog.hadoop.warehouse.location";
+
+ /**
+ * @deprecated please use {@link #catalogPropertyConfigKey(String, String)}
+ * with config key {@link org.apache.iceberg.CatalogProperties#CATALOG_IMPL}
+ * to specify the implementation of a catalog.
+ */
+ @Deprecated
public static final String CATALOG_LOADER_CLASS =
"iceberg.mr.catalog.loader.class";
+
public static final String CTAS_TABLE_NAME = "iceberg.mr.ctas.table.name";
public static final String SELECTED_COLUMNS = "iceberg.mr.selected.columns";
public static final String EXTERNAL_TABLE_PURGE = "external.table.purge";
@@ -211,6 +232,18 @@ public class InputFormatConfig {
return readColumns.split(conf.get(serdeConstants.COLUMN_NAME_DELIMITER,
String.valueOf(SerDeUtils.COMMA)));
}
+ /**
+ * Get Hadoop config key of a catalog property based on catalog name
+ * @param catalogName catalog name
+ * @param catalogProperty catalog property, can be any custom property,
+ * a commonly used list of properties can be found
+ * at {@link org.apache.iceberg.CatalogProperties}
+ * @return Hadoop config key of a catalog property for the catalog name
+ */
+ public static String catalogPropertyConfigKey(String catalogName, String
catalogProperty) {
+ return String.format("%s%s.%s", CATALOG_CONFIG_PREFIX, catalogName,
catalogProperty);
+ }
+
private static Schema schema(Configuration conf, String key) {
String json = conf.get(key);
return json == null ? null : SchemaParser.fromJson(json);
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java
index 458affd..47e9f3e 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.mr.hive.serde.objectinspector.WriteObjectInspector;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
import org.apache.iceberg.types.Type.PrimitiveType;
import org.apache.iceberg.types.Types.ListType;
@@ -232,7 +233,7 @@ class Deserializer {
FixNameMappingObjectInspectorPair(Schema schema, ObjectInspectorPair pair)
{
super(pair.writerInspector(), pair.sourceInspector());
- this.sourceNameMap = new HashMap<>(schema.columns().size());
+ this.sourceNameMap =
Maps.newHashMapWithExpectedSize(schema.columns().size());
List<? extends StructField> fields = ((StructObjectInspector)
sourceInspector()).getAllStructFieldRefs();
for (int i = 0; i < schema.columns().size(); ++i) {
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
index 6fc8cc6..fc66890 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
@@ -116,7 +116,7 @@ public class HiveIcebergInputFormat extends
MapredIcebergInputFormat<Record>
IcebergSplit icebergSplit = ((IcebergSplitContainer)
split).icebergSplit();
// bogus cast for favouring code reuse over syntax
return (RecordReader) HIVE_VECTORIZED_RECORDREADER_CTOR.newInstance(
- new org.apache.iceberg.mr.mapreduce.IcebergInputFormat<>(),
+ new IcebergInputFormat<>(),
icebergSplit,
job,
reporter);
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index 9a3c54a..c7951ff 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -39,9 +39,7 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.mapred.Container;
@@ -78,15 +76,15 @@ public class HiveIcebergOutputFormat<T> implements
OutputFormat<NullWritable, Co
long targetFileSize = PropertyUtil.propertyAsLong(table.properties(),
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
FileIO io = table.io();
- LocationProvider location = table.locationProvider();
- EncryptionManager encryption = table.encryption();
- OutputFileFactory outputFileFactory =
- new OutputFileFactory(spec, fileFormat, location, io, encryption,
taskAttemptID.getTaskID().getId(),
- taskAttemptID.getId(),
jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + taskAttemptID.getJobID());
+ int partitionId = taskAttemptID.getTaskID().getId();
+ int taskId = taskAttemptID.getId();
+ String operationId = jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" +
taskAttemptID.getJobID();
+ OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table,
partitionId, taskId)
+ .format(fileFormat)
+ .operationId(operationId)
+ .build();
String tableName = jc.get(Catalogs.NAME);
- HiveIcebergRecordWriter writer = new HiveIcebergRecordWriter(schema, spec,
fileFormat,
+ return new HiveIcebergRecordWriter(schema, spec, fileFormat,
new GenericAppenderFactory(schema, spec), outputFileFactory, io,
targetFileSize, taskAttemptID, tableName);
-
- return writer;
}
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
index 6bd4214..c215e48 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
@@ -21,7 +21,6 @@ package org.apache.iceberg.mr.hive;
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -56,6 +55,7 @@ import
org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,7 +70,7 @@ public class HiveIcebergSerDe extends AbstractSerDe {
private ObjectInspector inspector;
private Schema tableSchema;
private Collection<String> partitionColumns;
- private Map<ObjectInspector, Deserializer> deserializers = new HashMap<>(1);
+ private Map<ObjectInspector, Deserializer> deserializers =
Maps.newHashMapWithExpectedSize(1);
private Container<Record> row = new Container<>();
@Override
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index c6525aa..fdba3df 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
@@ -66,6 +67,7 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
@@ -136,13 +138,14 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
InputFormatConfig.InMemoryDataModel model =
conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
InputFormatConfig.InMemoryDataModel.GENERIC);
try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks())
{
+ Table serializableTable = SerializableTable.copyOf(table);
tasksIterable.forEach(task -> {
if (applyResidual && (model ==
InputFormatConfig.InMemoryDataModel.HIVE ||
model == InputFormatConfig.InMemoryDataModel.PIG)) {
// TODO: We do not support residual evaluation for HIVE and PIG in
memory data model yet
checkResiduals(task);
}
- splits.add(new IcebergSplit(conf, task, table.io(),
table.encryption()));
+ splits.add(new IcebergSplit(serializableTable, conf, task));
});
} catch (IOException e) {
throw new UncheckedIOException(String.format("Failed to close table
scan: %s", scan), e);
@@ -190,6 +193,7 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
private TaskAttemptContext context;
private Schema tableSchema;
private Schema expectedSchema;
+ private String nameMapping;
private boolean reuseContainers;
private boolean caseSensitive;
private InputFormatConfig.InMemoryDataModel inMemoryDataModel;
@@ -205,10 +209,12 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
// For now IcebergInputFormat does its own split planning and does not
accept FileSplit instances
CombinedScanTask task = ((IcebergSplit) split).task();
this.context = newContext;
- this.io = ((IcebergSplit) split).io();
- this.encryptionManager = ((IcebergSplit) split).encryptionManager();
+ Table table = ((IcebergSplit) split).table();
+ this.io = table.io();
+ this.encryptionManager = table.encryption();
this.tasks = task.files().iterator();
this.tableSchema = InputFormatConfig.tableSchema(conf);
+ this.nameMapping =
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
this.caseSensitive = conf.getBoolean(InputFormatConfig.CASE_SENSITIVE,
InputFormatConfig.CASE_SENSITIVE_DEFAULT);
this.expectedSchema = readSchema(conf, tableSchema, caseSensitive);
this.reuseContainers =
conf.getBoolean(InputFormatConfig.REUSE_CONTAINERS, false);
@@ -333,6 +339,9 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
if (reuseContainers) {
avroReadBuilder.reuseContainers();
}
+ if (nameMapping != null) {
+
avroReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+ }
switch (inMemoryDataModel) {
case PIG:
@@ -357,6 +366,9 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
if (reuseContainers) {
parquetReadBuilder.reuseContainers();
}
+ if (nameMapping != null) {
+
parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+ }
switch (inMemoryDataModel) {
case PIG:
@@ -380,8 +392,8 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
// ORC does not support reuse containers yet
switch (inMemoryDataModel) {
case PIG:
- // TODO: implement value readers for Pig and Hive
- throw new UnsupportedOperationException("ORC support not yet
supported for Pig and Hive");
+ // TODO: implement value readers for Pig
+ throw new UnsupportedOperationException("ORC support not yet
supported for Pig");
case HIVE:
if (MetastoreUtil.hive3PresentOnClasspath()) {
orcIterator = HIVE_VECTORIZED_READER_BUILDER.invoke(inputFile,
task, idToConstant, context);
@@ -398,6 +410,10 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
orcReadBuilder.createReaderFunc(
fileSchema -> GenericOrcReader.buildReader(
readSchema, fileSchema, idToConstant));
+
+ if (nameMapping != null) {
+
orcReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+ }
orcIterator = orcReadBuilder.build();
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
index 43b78d6..8bc332e 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
@@ -26,11 +26,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.encryption.EncryptionManager;
-import org.apache.iceberg.hadoop.HadoopFileIO;
-import org.apache.iceberg.hadoop.SerializableConfiguration;
+import org.apache.iceberg.Table;
import org.apache.iceberg.hadoop.Util;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.util.SerializationUtil;
@@ -40,9 +37,8 @@ public class IcebergSplit extends InputSplit implements
org.apache.hadoop.mapred
public static final String[] ANYWHERE = new String[]{"*"};
+ private Table table;
private CombinedScanTask task;
- private FileIO io;
- private EncryptionManager encryptionManager;
private transient String[] locations;
private transient Configuration conf;
@@ -51,11 +47,10 @@ public class IcebergSplit extends InputSplit implements
org.apache.hadoop.mapred
public IcebergSplit() {
}
- IcebergSplit(Configuration conf, CombinedScanTask task, FileIO io,
EncryptionManager encryptionManager) {
+ IcebergSplit(Table table, Configuration conf, CombinedScanTask task) {
+ this.table = table;
this.task = task;
this.conf = conf;
- this.io = io;
- this.encryptionManager = encryptionManager;
}
public CombinedScanTask task() {
@@ -88,45 +83,27 @@ public class IcebergSplit extends InputSplit implements
org.apache.hadoop.mapred
@Override
public void write(DataOutput out) throws IOException {
+ byte[] tableData = SerializationUtil.serializeToBytes(table);
+ out.writeInt(tableData.length);
+ out.write(tableData);
+
byte[] data = SerializationUtil.serializeToBytes(this.task);
out.writeInt(data.length);
out.write(data);
-
- byte[] ioData;
- if (io instanceof HadoopFileIO) {
- SerializableConfiguration serializableConf = new
SerializableConfiguration(((HadoopFileIO) io).conf());
- ioData = SerializationUtil.serializeToBytes(new
HadoopFileIO(serializableConf::get));
- } else {
- ioData = SerializationUtil.serializeToBytes(io);
- }
- out.writeInt(ioData.length);
- out.write(ioData);
-
- byte[] encryptionManagerData =
SerializationUtil.serializeToBytes(encryptionManager);
- out.writeInt(encryptionManagerData.length);
- out.write(encryptionManagerData);
}
@Override
public void readFields(DataInput in) throws IOException {
+ byte[] tableData = new byte[in.readInt()];
+ in.readFully(tableData);
+ this.table = SerializationUtil.deserializeFromBytes(tableData);
+
byte[] data = new byte[in.readInt()];
in.readFully(data);
this.task = SerializationUtil.deserializeFromBytes(data);
-
- byte[] ioData = new byte[in.readInt()];
- in.readFully(ioData);
- this.io = SerializationUtil.deserializeFromBytes(ioData);
-
- byte[] encryptionManagerData = new byte[in.readInt()];
- in.readFully(encryptionManagerData);
- this.encryptionManager =
SerializationUtil.deserializeFromBytes(encryptionManagerData);
- }
-
- public FileIO io() {
- return io;
}
- public EncryptionManager encryptionManager() {
- return encryptionManager;
+ public Table table() {
+ return table;
}
}
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
index 04168eb..9b3ee40 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
@@ -25,6 +25,7 @@ import java.util.Optional;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
@@ -38,6 +39,7 @@ import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -191,91 +193,149 @@ public class TestCatalogs {
}
@Test
- public void testLoadCatalog() throws IOException {
- conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
- Assert.assertFalse(Catalogs.loadCatalog(conf, null).isPresent());
-
- String nonExistentCatalogType = "fooType";
+ public void testLegacyLoadCatalogDefault() {
+ Optional<Catalog> defaultCatalog = Catalogs.loadCatalog(conf, null);
+ Assert.assertTrue(defaultCatalog.isPresent());
+
Assertions.assertThat(defaultCatalog.get()).isInstanceOf(HiveCatalog.class);
+ Assert.assertTrue(Catalogs.hiveCatalog(conf, new Properties()));
+ }
- conf.set(InputFormatConfig.CATALOG, nonExistentCatalogType);
- AssertHelpers.assertThrows(
- "should complain about catalog not supported",
UnsupportedOperationException.class,
- "Unknown catalog type", () -> Catalogs.loadCatalog(conf, null));
+ @Test
+ public void testLegacyLoadCatalogHive() {
+ conf.set(InputFormatConfig.CATALOG, CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
+ Optional<Catalog> hiveCatalog = Catalogs.loadCatalog(conf, null);
+ Assert.assertTrue(hiveCatalog.isPresent());
+ Assertions.assertThat(hiveCatalog.get()).isInstanceOf(HiveCatalog.class);
+ Assert.assertTrue(Catalogs.hiveCatalog(conf, new Properties()));
+ }
+ @Test
+ public void testLegacyLoadCatalogHadoop() {
conf.set(InputFormatConfig.CATALOG,
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
conf.set(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION,
"/tmp/mylocation");
Optional<Catalog> hadoopCatalog = Catalogs.loadCatalog(conf, null);
-
Assert.assertTrue(hadoopCatalog.isPresent());
- Assert.assertTrue(hadoopCatalog.get() instanceof HadoopCatalog);
-
- conf.set(InputFormatConfig.CATALOG, CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
- Optional<Catalog> hiveCatalog = Catalogs.loadCatalog(conf, null);
+
Assertions.assertThat(hadoopCatalog.get()).isInstanceOf(HadoopCatalog.class);
+ Assert.assertFalse(Catalogs.hiveCatalog(conf, new Properties()));
+ }
- Assert.assertTrue(hiveCatalog.isPresent());
- Assert.assertTrue(hiveCatalog.get() instanceof HiveCatalog);
+ @Test
+ public void testLegacyLoadCatalogCustom() {
+ conf.set(InputFormatConfig.CATALOG_LOADER_CLASS,
CustomHadoopCatalog.class.getName());
+ conf.set(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION,
"/tmp/mylocation");
+ Optional<Catalog> customHadoopCatalog = Catalogs.loadCatalog(conf, null);
+ Assert.assertTrue(customHadoopCatalog.isPresent());
+
Assertions.assertThat(customHadoopCatalog.get()).isInstanceOf(CustomHadoopCatalog.class);
+ Assert.assertFalse(Catalogs.hiveCatalog(conf, new Properties()));
+ }
+ @Test
+ public void testLegacyLoadCatalogLocation() {
conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
Assert.assertFalse(Catalogs.loadCatalog(conf, null).isPresent());
+ }
- // arbitrary catalog name with non existent catalog type
- String catalogName = "barCatalog";
- conf.unset(InputFormatConfig.CATALOG);
- conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE,
catalogName), nonExistentCatalogType);
+ @Test
+ public void testLegacyLoadCatalogUnknown() {
+ conf.set(InputFormatConfig.CATALOG, "fooType");
AssertHelpers.assertThrows(
- "should complain about catalog not supported",
UnsupportedOperationException.class,
- "Unknown catalog type:", () -> Catalogs.loadCatalog(conf,
catalogName));
+ "should complain about catalog not supported",
UnsupportedOperationException.class,
+ "Unknown catalog type", () -> Catalogs.loadCatalog(conf, null));
+ }
- // arbitrary catalog name with hadoop catalog type and default warehouse
location
- conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE,
catalogName),
- CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
- hadoopCatalog = Catalogs.loadCatalog(conf, catalogName);
+ @Test
+ public void testLoadCatalogDefault() {
+ String catalogName = "barCatalog";
+ Optional<Catalog> defaultCatalog = Catalogs.loadCatalog(conf, catalogName);
+ Assert.assertTrue(defaultCatalog.isPresent());
+
Assertions.assertThat(defaultCatalog.get()).isInstanceOf(HiveCatalog.class);
+ Properties properties = new Properties();
+ properties.put(InputFormatConfig.CATALOG_NAME, catalogName);
+ Assert.assertTrue(Catalogs.hiveCatalog(conf, properties));
+ }
- Assert.assertTrue(hadoopCatalog.isPresent());
- Assert.assertTrue(hadoopCatalog.get() instanceof HadoopCatalog);
+ @Test
+ public void testLoadCatalogHive() {
+ String catalogName = "barCatalog";
+ conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName,
CatalogUtil.ICEBERG_CATALOG_TYPE),
+ CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
+ Optional<Catalog> hiveCatalog = Catalogs.loadCatalog(conf, catalogName);
+ Assert.assertTrue(hiveCatalog.isPresent());
+ Assertions.assertThat(hiveCatalog.get()).isInstanceOf(HiveCatalog.class);
+ Properties properties = new Properties();
+ properties.put(InputFormatConfig.CATALOG_NAME, catalogName);
+ Assert.assertTrue(Catalogs.hiveCatalog(conf, properties));
+ }
- // arbitrary catalog name with hadoop catalog type and provided warehouse
location
- conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE,
catalogName),
- CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
- conf.set(String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE,
catalogName), "/tmp/mylocation");
- hadoopCatalog = Catalogs.loadCatalog(conf, catalogName);
+ @Test
+ public void testLegacyLoadCustomCatalogWithHiveCatalogTypeSet() {
+ String catalogName = "barCatalog";
+ conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName,
CatalogUtil.ICEBERG_CATALOG_TYPE),
+ CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
+ conf.set(InputFormatConfig.CATALOG_LOADER_CLASS,
CustomHadoopCatalog.class.getName());
+ conf.set(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION,
"/tmp/mylocation");
+ AssertHelpers.assertThrows("Should complain about both configs being set",
IllegalArgumentException.class,
+ "both type and catalog-impl are set", () ->
Catalogs.loadCatalog(conf, catalogName));
+ }
+ @Test
+ public void testLoadCatalogHadoop() {
+ String catalogName = "barCatalog";
+ conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName,
CatalogUtil.ICEBERG_CATALOG_TYPE),
+ CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+ conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName,
CatalogProperties.WAREHOUSE_LOCATION),
+ "/tmp/mylocation");
+ Optional<Catalog> hadoopCatalog = Catalogs.loadCatalog(conf, catalogName);
Assert.assertTrue(hadoopCatalog.isPresent());
- Assert.assertTrue(hadoopCatalog.get() instanceof HadoopCatalog);
+
Assertions.assertThat(hadoopCatalog.get()).isInstanceOf(HadoopCatalog.class);
Assert.assertEquals("HadoopCatalog{name=barCatalog,
location=/tmp/mylocation}", hadoopCatalog.get().toString());
+ Properties properties = new Properties();
+ properties.put(InputFormatConfig.CATALOG_NAME, catalogName);
+ Assert.assertFalse(Catalogs.hiveCatalog(conf, properties));
+ }
- // arbitrary catalog name with hive catalog type
- conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE,
catalogName),
- CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
- hiveCatalog = Catalogs.loadCatalog(conf, catalogName);
-
- Assert.assertTrue(hiveCatalog.isPresent());
- Assert.assertTrue(hiveCatalog.get() instanceof HiveCatalog);
-
- // arbitrary catalog name with custom catalog type without specific
classloader
- conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE,
catalogName), "custom");
- AssertHelpers.assertThrows(
- "should complain about catalog not supported",
UnsupportedOperationException.class,
- "Unknown catalog type:", () -> Catalogs.loadCatalog(conf,
catalogName));
+ @Test
+ public void testLoadCatalogHadoopWithLegacyWarehouseLocation() {
+ String catalogName = "barCatalog";
+ conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName,
CatalogUtil.ICEBERG_CATALOG_TYPE),
+ CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+ conf.set(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION,
"/tmp/mylocation");
+ Optional<Catalog> hadoopCatalog = Catalogs.loadCatalog(conf, catalogName);
+ Assert.assertTrue(hadoopCatalog.isPresent());
+
Assertions.assertThat(hadoopCatalog.get()).isInstanceOf(HadoopCatalog.class);
+ Assert.assertEquals("HadoopCatalog{name=barCatalog,
location=/tmp/mylocation}", hadoopCatalog.get().toString());
+ Properties properties = new Properties();
+ properties.put(InputFormatConfig.CATALOG_NAME, catalogName);
+ Assert.assertFalse(Catalogs.hiveCatalog(conf, properties));
+ }
- // arbitrary catalog name with custom catalog type and provided classloader
- conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE,
catalogName), "custom");
- conf.set(String.format(InputFormatConfig.CATALOG_CLASS_TEMPLATE,
catalogName), CustomHadoopCatalog.class.getName());
+ @Test
+ public void testLoadCatalogCustom() {
+ String catalogName = "barCatalog";
+ conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName,
CatalogProperties.CATALOG_IMPL),
+ CustomHadoopCatalog.class.getName());
+ conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName,
CatalogProperties.WAREHOUSE_LOCATION),
+ "/tmp/mylocation");
Optional<Catalog> customHadoopCatalog = Catalogs.loadCatalog(conf,
catalogName);
-
Assert.assertTrue(customHadoopCatalog.isPresent());
- Assert.assertTrue(customHadoopCatalog.get() instanceof
CustomHadoopCatalog);
-
- // arbitrary catalog name with location catalog type
- conf.unset(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE,
catalogName));
- Assert.assertFalse(Catalogs.loadCatalog(conf, catalogName).isPresent());
+
Assertions.assertThat(customHadoopCatalog.get()).isInstanceOf(CustomHadoopCatalog.class);
+ Properties properties = new Properties();
+ properties.put(InputFormatConfig.CATALOG_NAME, catalogName);
+ Assert.assertFalse(Catalogs.hiveCatalog(conf, properties));
+ }
- // default catalog configuration
- conf.unset(InputFormatConfig.CATALOG);
- hiveCatalog = Catalogs.loadCatalog(conf, null);
+ @Test
+ public void testLoadCatalogLocation() {
+ Assert.assertFalse(Catalogs.loadCatalog(conf,
Catalogs.ICEBERG_HADOOP_TABLE_NAME).isPresent());
+ }
- Assert.assertTrue(hiveCatalog.isPresent());
- Assert.assertTrue(hiveCatalog.get() instanceof HiveCatalog);
+ @Test
+ public void testLoadCatalogUnknown() {
+ String catalogName = "barCatalog";
+ conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName,
CatalogUtil.ICEBERG_CATALOG_TYPE), "fooType");
+ AssertHelpers.assertThrows(
+ "should complain about catalog not supported",
UnsupportedOperationException.class,
+ "Unknown catalog type:", () -> Catalogs.loadCatalog(conf,
catalogName));
}
public static class CustomHadoopCatalog extends HadoopCatalog {
@@ -291,9 +351,10 @@ public class TestCatalogs {
}
private void setCustomCatalogProperties(String catalogName, String
warehouseLocation) {
- conf.set(String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE,
catalogName), warehouseLocation);
- conf.set(String.format(InputFormatConfig.CATALOG_CLASS_TEMPLATE,
catalogName), CustomHadoopCatalog.class.getName());
- conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE,
catalogName), "custom");
+ conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName,
CatalogProperties.WAREHOUSE_LOCATION),
+ warehouseLocation);
+ conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName,
CatalogProperties.CATALOG_IMPL),
+ CustomHadoopCatalog.class.getName());
conf.set(InputFormatConfig.CATALOG_NAME, catalogName);
}
}
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
index 6f3cbee..bdc1014 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
@@ -351,10 +352,10 @@ public class TestIcebergInputFormats {
String warehouseLocation =
temp.newFolder("hadoop_catalog").getAbsolutePath();
conf.set("warehouse.location", warehouseLocation);
conf.set(InputFormatConfig.CATALOG_NAME,
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME);
- conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE,
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME),
- CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
- conf.set(String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE,
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME),
- warehouseLocation);
+
conf.set(InputFormatConfig.catalogPropertyConfigKey(Catalogs.ICEBERG_DEFAULT_CATALOG_NAME,
+ CatalogUtil.ICEBERG_CATALOG_TYPE),
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+
conf.set(InputFormatConfig.catalogPropertyConfigKey(Catalogs.ICEBERG_DEFAULT_CATALOG_NAME,
+ CatalogProperties.WAREHOUSE_LOCATION), warehouseLocation);
Catalog catalog = new HadoopCatalog(conf, conf.get("warehouse.location"));
TableIdentifier identifier = TableIdentifier.of("db", "t");
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
index ef964f8..2ba4e50 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.data.DeleteReadTests;
+import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.Assert;
@@ -104,6 +105,7 @@ public class TestInputFormatReaderDeletes extends
DeleteReadTests {
.filter(recordFactory -> recordFactory.name().equals(inputFormat))
.map(recordFactory ->
recordFactory.create(builder.project(projected).conf()).getRecords())
.flatMap(List::stream)
+ .map(record -> new
InternalRecordWrapper(projected.asStruct()).wrap(record))
.collect(Collectors.toList())
);
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergCTAS.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergCTAS.java
index 72a04a4..14760ee 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergCTAS.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergCTAS.java
@@ -26,6 +26,7 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Assume;
@@ -45,9 +46,10 @@ public class TestHiveIcebergCTAS extends
HiveIcebergStorageHandlerWithEngineBase
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS,
TableIdentifier.of("default", "source"), false));
shell.executeStatement(String.format(
- "CREATE TABLE target STORED BY ICEBERG %s TBLPROPERTIES ('%s'='%s') AS
SELECT * FROM source",
+ "CREATE TABLE target STORED BY ICEBERG %s %s AS SELECT * FROM source",
testTables.locationForCreateTableSQL(TableIdentifier.of("default",
"target")),
- TableProperties.DEFAULT_FILE_FORMAT, fileFormat));
+ testTables.propertiesForCreateTableSQL(
+ ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT,
fileFormat.toString()))));
List<Object[]> objects = shell.executeStatement("SELECT * FROM target
ORDER BY id");
HiveIcebergTestUtils.validateData(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS,
@@ -64,8 +66,9 @@ public class TestHiveIcebergCTAS extends
HiveIcebergStorageHandlerWithEngineBase
shell.executeStatement(String.format(
"CREATE TABLE target PARTITIONED BY (dept, name) " +
- "STORED BY ICEBERG TBLPROPERTIES ('%s'='%s') AS SELECT * FROM
source",
- TableProperties.DEFAULT_FILE_FORMAT, fileFormat));
+ "STORED BY ICEBERG %s AS SELECT * FROM source",
+ testTables.propertiesForCreateTableSQL(
+ ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT,
fileFormat.toString()))));
// check table can be read back correctly
List<Object[]> objects = shell.executeStatement("SELECT id, name, dept
FROM target ORDER BY id");
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
index 3143d32..5222361 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.mr.TestHelper;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
@@ -305,10 +306,11 @@ public class TestHiveIcebergInserts extends
HiveIcebergStorageHandlerWithEngineB
// create Iceberg table without specifying a write format in the tbl
properties
// it should fall back to using the default file format
- shell.executeStatement(String.format("CREATE EXTERNAL TABLE %s (id bigint,
name string) STORED BY '%s' %s",
+ shell.executeStatement(String.format("CREATE EXTERNAL TABLE %s (id bigint,
name string) STORED BY '%s' %s %s",
identifier,
HiveIcebergStorageHandler.class.getName(),
- testTables.locationForCreateTableSQL(identifier)));
+ testTables.locationForCreateTableSQL(identifier),
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of())));
shell.executeStatement(String.format("INSERT INTO %s VALUES (10,
'Linda')", identifier));
List<Object[]> results = shell.executeStatement(String.format("SELECT *
FROM %s", identifier));
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergMigration.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergMigration.java
index 205f9c0..c8de0d1 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergMigration.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergMigration.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Assume;
@@ -51,7 +52,8 @@ public class TestHiveIcebergMigration extends
HiveIcebergStorageHandlerWithEngin
Assume.assumeTrue(fileFormat == FileFormat.AVRO || fileFormat ==
FileFormat.PARQUET);
String tableName = "tbl";
String createQuery = "CREATE EXTERNAL TABLE " + tableName + " (a int)
STORED AS " + fileFormat.name() + " " +
- testTables.locationForCreateTableSQL(TableIdentifier.of("default",
tableName));
+ testTables.locationForCreateTableSQL(TableIdentifier.of("default",
tableName)) +
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of());
shell.executeStatement(createQuery);
shell.executeStatement("INSERT INTO " + tableName + " VALUES (1), (2),
(3)");
validateMigration(tableName);
@@ -62,7 +64,8 @@ public class TestHiveIcebergMigration extends
HiveIcebergStorageHandlerWithEngin
Assume.assumeTrue(fileFormat == FileFormat.AVRO || fileFormat ==
FileFormat.PARQUET);
String tableName = "tbl_part";
shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int)
PARTITIONED BY (b string) STORED AS " +
- fileFormat.name() + " " +
testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)));
+ fileFormat.name() + " " +
testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)) +
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa')
VALUES (1), (2), (3)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='bbb')
VALUES (4), (5)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='ccc')
VALUES (6)");
@@ -76,7 +79,8 @@ public class TestHiveIcebergMigration extends
HiveIcebergStorageHandlerWithEngin
String tableName = "tbl_part_bucketed";
shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int)
PARTITIONED BY (b string) clustered by " +
"(a) INTO 2 BUCKETS STORED AS " + fileFormat.name() + " " +
- testTables.locationForCreateTableSQL(TableIdentifier.of("default",
tableName)));
+ testTables.locationForCreateTableSQL(TableIdentifier.of("default",
tableName)) +
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa')
VALUES (1), (2), (3)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='bbb')
VALUES (4), (5)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='ccc')
VALUES (6)");
@@ -89,7 +93,8 @@ public class TestHiveIcebergMigration extends
HiveIcebergStorageHandlerWithEngin
Assume.assumeTrue(fileFormat == FileFormat.AVRO || fileFormat ==
FileFormat.PARQUET);
String tableName = "tbl_rollback";
shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int)
STORED AS " + fileFormat.name() + " " +
- testTables.locationForCreateTableSQL(TableIdentifier.of("default",
tableName)));
+ testTables.locationForCreateTableSQL(TableIdentifier.of("default",
tableName)) +
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
shell.executeStatement("INSERT INTO " + tableName + " VALUES (1), (2),
(3)");
validateMigrationRollback(tableName);
}
@@ -99,7 +104,8 @@ public class TestHiveIcebergMigration extends
HiveIcebergStorageHandlerWithEngin
Assume.assumeTrue(fileFormat == FileFormat.AVRO || fileFormat ==
FileFormat.PARQUET);
String tableName = "tbl_rollback";
shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int)
PARTITIONED BY (b string) STORED AS " +
- fileFormat.name() + " " +
testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)));
+ fileFormat.name() + " " +
testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)) +
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa')
VALUES (1), (2), (3)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='bbb')
VALUES (4), (5)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='ccc')
VALUES (6)");
@@ -113,7 +119,8 @@ public class TestHiveIcebergMigration extends
HiveIcebergStorageHandlerWithEngin
String tableName = "tbl_rollback";
shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int)
PARTITIONED BY (b string, c int) " +
"STORED AS " + fileFormat.name() + " " +
- testTables.locationForCreateTableSQL(TableIdentifier.of("default",
tableName)));
+ testTables.locationForCreateTableSQL(TableIdentifier.of("default",
tableName)) +
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa',
c='111') VALUES (1), (2), (3)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='bbb',
c='111') VALUES (4), (5)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa',
c='222') VALUES (6)");
@@ -127,7 +134,8 @@ public class TestHiveIcebergMigration extends
HiveIcebergStorageHandlerWithEngin
String tableName = "tbl_part_bucketed";
shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int)
PARTITIONED BY (b string) clustered by " +
"(a) INTO 2 BUCKETS STORED AS " + fileFormat.name() + " " +
- testTables.locationForCreateTableSQL(TableIdentifier.of("default",
tableName)));
+ testTables.locationForCreateTableSQL(TableIdentifier.of("default",
tableName)) +
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa')
VALUES (1), (2), (3)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='bbb')
VALUES (4), (5)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='ccc')
VALUES (6)");
@@ -144,7 +152,8 @@ public class TestHiveIcebergMigration extends
HiveIcebergStorageHandlerWithEngin
List<String> formats = ImmutableList.of("TEXTFILE", "JSONFILE", "RCFILE",
"SEQUENCEFILE");
formats.forEach(format -> {
shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int)
STORED AS " + format + " " +
- testTables.locationForCreateTableSQL(TableIdentifier.of("default",
tableName)));
+ testTables.locationForCreateTableSQL(TableIdentifier.of("default",
tableName)) +
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
shell.executeStatement("INSERT INTO " + tableName + " VALUES (1), (2),
(3)");
AssertHelpers.assertThrows("Migrating a " + format + " table to Iceberg
should have thrown an exception.",
IllegalArgumentException.class, "Cannot convert hive table to
iceberg with input format: ",
@@ -161,7 +170,8 @@ public class TestHiveIcebergMigration extends
HiveIcebergStorageHandlerWithEngin
testTableType == TestTables.TestTableType.HIVE_CATALOG);
String tableName = "tbl_unsupported";
shell.executeStatement("CREATE MANAGED TABLE " + tableName + " (a int)
STORED AS " + fileFormat + " " +
- testTables.locationForCreateTableSQL(TableIdentifier.of("default",
tableName)));
+ testTables.locationForCreateTableSQL(TableIdentifier.of("default",
tableName)) +
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
shell.executeStatement("INSERT INTO " + tableName + " VALUES (1), (2),
(3)");
AssertHelpers.assertThrows("Migrating a managed table to Iceberg should
have thrown an exception.",
IllegalArgumentException.class, "Converting non-external, temporary or
transactional hive table to iceberg",
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
index af8c383..7b4dce5 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
@@ -41,10 +41,8 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
@@ -266,18 +264,20 @@ public class TestHiveIcebergOutputCommitter {
Table table = HiveIcebergStorageHandler.table(conf, name);
FileIO io = table.io();
- LocationProvider location = table.locationProvider();
- EncryptionManager encryption = table.encryption();
Schema schema = HiveIcebergStorageHandler.schema(conf);
PartitionSpec spec = table.spec();
for (int i = 0; i < taskNum; ++i) {
List<Record> records = TestHelper.generateRandomRecords(schema,
RECORD_NUM, i + attemptNum);
TaskAttemptID taskId = new TaskAttemptID(JOB_ID.getJtIdentifier(),
JOB_ID.getId(), TaskType.MAP, i, attemptNum);
- OutputFileFactory outputFileFactory =
- new OutputFileFactory(spec, FileFormat.PARQUET, location, io,
encryption, taskId.getTaskID().getId(),
- attemptNum, QUERY_ID + "-" + JOB_ID);
- HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema,
spec, FileFormat.PARQUET,
+ int partitionId = taskId.getTaskID().getId();
+ String operationId = QUERY_ID + "-" + JOB_ID;
+ FileFormat fileFormat = FileFormat.PARQUET;
+ OutputFileFactory outputFileFactory =
OutputFileFactory.builderFor(table, partitionId, attemptNum)
+ .format(fileFormat)
+ .operationId(operationId)
+ .build();
+ HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema,
spec, fileFormat,
new GenericAppenderFactory(schema), outputFileFactory, io,
TARGET_FILE_SIZE,
TezUtil.taskAttemptWrapper(taskId), conf.get(Catalogs.NAME));
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
index da21af3..84e8b57 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
@@ -26,9 +26,8 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.mr.Catalogs;
-import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.TestHelper;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
@@ -171,9 +170,9 @@ public class TestHiveIcebergSelects extends
HiveIcebergStorageHandlerWithEngineB
// note: the Chinese character seems to be accepted in the column name,
but not
// in the table name - this is the case for both Iceberg and standard Hive
tables.
shell.executeStatement(String.format(
- "CREATE TABLE `%s` (id bigint, `dep,! 是,t` string) STORED BY ICEBERG
STORED AS %s %s TBLPROPERTIES ('%s'='%s')",
+ "CREATE TABLE `%s` (id bigint, `dep,! 是,t` string) STORED BY ICEBERG
STORED AS %s %s %s",
table.name(), fileFormat, testTables.locationForCreateTableSQL(table),
- InputFormatConfig.CATALOG_NAME,
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME));
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of())));
shell.executeStatement(String.format("INSERT INTO `%s` VALUES (1, 'moon'),
(2, 'star')", table.name()));
List<Object[]> result = shell.executeStatement(String.format(
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
index d601100..4dca587 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hive.HiveTableOperations;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Assume;
@@ -147,9 +148,10 @@ public class TestHiveIcebergStatistics extends
HiveIcebergStorageHandlerWithEngi
shell.setHiveSessionValue(HiveConf.ConfVars.HIVESTATSAUTOGATHER.varname,
true);
shell.executeStatement(String.format(
- "CREATE TABLE target STORED BY ICEBERG %s TBLPROPERTIES ('%s'='%s') AS
SELECT * FROM source",
+ "CREATE TABLE target STORED BY ICEBERG %s %s AS SELECT * FROM source",
testTables.locationForCreateTableSQL(TableIdentifier.of("default",
"target")),
- TableProperties.DEFAULT_FILE_FORMAT, fileFormat));
+ testTables.propertiesForCreateTableSQL(
+ ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT,
fileFormat.toString()))));
checkColStat("target", "id", true);
checkColStatMinMaxValue("target", "id", 0, 2);
@@ -165,9 +167,9 @@ public class TestHiveIcebergStatistics extends
HiveIcebergStorageHandlerWithEngi
shell.setHiveSessionValue(HiveConf.ConfVars.HIVESTATSAUTOGATHER.varname,
true);
shell.executeStatement(String.format(
- "CREATE TABLE target PARTITIONED BY (dept, name) " +
- "STORED BY ICEBERG TBLPROPERTIES ('%s'='%s') AS SELECT * FROM
source s",
- TableProperties.DEFAULT_FILE_FORMAT, fileFormat));
+ "CREATE TABLE target PARTITIONED BY (dept, name) STORED BY ICEBERG %s
AS SELECT * FROM source s",
+ testTables.propertiesForCreateTableSQL(
+ ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT,
fileFormat.toString()))));
checkColStat("target", "id", true);
checkColStat("target", "dept", true);
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerLocalScan.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerLocalScan.java
index dcadacf..218114b 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerLocalScan.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerLocalScan.java
@@ -36,7 +36,6 @@ import org.apache.iceberg.TestHelpers.Row;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -233,7 +232,7 @@ public class TestHiveIcebergStorageHandlerLocalScan {
"TBLPROPERTIES ('" + InputFormatConfig.PARTITION_SPEC + "'='" +
PartitionSpecParser.toJson(spec) + "', " +
"'" + InputFormatConfig.TABLE_SCHEMA + "'='" +
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', "
+
- "'" + InputFormatConfig.CATALOG_NAME + "'='" +
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')";
+ "'" + InputFormatConfig.CATALOG_NAME + "'='" +
testTables.catalogName() + "')";
runCreateAndReadTest(identifier, createSql,
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, data);
}
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
index ec0cd4e..fc90722 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
@@ -175,7 +175,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
testTables.locationForCreateTableSQL(identifier) +
" TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
SchemaParser.toJson(schema) + "', " +
- "'" + InputFormatConfig.CATALOG_NAME + "'='" +
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
+ "'" + InputFormatConfig.CATALOG_NAME + "'='" +
testTables.catalogName() + "')");
shell.executeStatement("ALTER TABLE " + identifier + " SET PARTITION SPEC
(month(ts))");
@@ -219,7 +219,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
testTables.locationForCreateTableSQL(identifier) +
"TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
SchemaParser.toJson(schema) + "', " +
- "'" + InputFormatConfig.CATALOG_NAME + "'='" +
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
+ "'" + InputFormatConfig.CATALOG_NAME + "'='" +
testTables.catalogName() + "')");
PartitionSpec spec = PartitionSpec.builderFor(schema)
.year("year_field")
@@ -280,7 +280,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
testTables.locationForCreateTableSQL(identifier) +
" TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
SchemaParser.toJson(schema) + "', " +
- "'" + InputFormatConfig.CATALOG_NAME + "'='" +
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
+ "'" + InputFormatConfig.CATALOG_NAME + "'='" +
testTables.catalogName() + "')");
Table table = testTables.loadTable(identifier);
Assert.assertEquals(spec, table.spec());
}
@@ -297,7 +297,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
"'" + InputFormatConfig.PARTITION_SPEC + "'='" +
PartitionSpecParser.toJson(PartitionSpec.unpartitioned()) + "', " +
"'dummy'='test', " +
- "'" + InputFormatConfig.CATALOG_NAME + "'='" +
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
+ "'" + InputFormatConfig.CATALOG_NAME + "'='" +
testTables.catalogName() + "')");
// Check the Iceberg table data
org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
@@ -377,7 +377,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
"STRING) STORED BY iceBerg %s TBLPROPERTIES ('%s'='%s')",
testTables.locationForCreateTableSQL(identifier),
InputFormatConfig.CATALOG_NAME,
- Catalogs.ICEBERG_DEFAULT_CATALOG_NAME);
+ testTables.catalogName());
shell.executeStatement(query);
Assert.assertNotNull(testTables.loadTable(identifier));
}
@@ -391,7 +391,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
"orc",
testTables.locationForCreateTableSQL(identifier),
InputFormatConfig.CATALOG_NAME,
- Catalogs.ICEBERG_DEFAULT_CATALOG_NAME);
+ testTables.catalogName());
shell.executeStatement(query);
Table table = testTables.loadTable(identifier);
Assert.assertNotNull(table);
@@ -407,7 +407,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
testTables.locationForCreateTableSQL(identifier) +
"TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "','"
+
- InputFormatConfig.CATALOG_NAME + "'='" +
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
+ InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() +
"')");
// Check the Iceberg table partition data
org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
@@ -425,7 +425,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', "
+
"'" + InputFormatConfig.PARTITION_SPEC + "'='" +
PartitionSpecParser.toJson(PartitionSpec.unpartitioned()) + "', " +
- "'" + InputFormatConfig.CATALOG_NAME + "'='" +
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
+ "'" + InputFormatConfig.CATALOG_NAME + "'='" +
testTables.catalogName() + "')");
// Check the Iceberg table partition data
org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
@@ -442,7 +442,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
"TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', "
+
"'" + InputFormatConfig.EXTERNAL_TABLE_PURGE + "'='FALSE', " +
- "'" + InputFormatConfig.CATALOG_NAME + "'='" +
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
+ "'" + InputFormatConfig.CATALOG_NAME + "'='" +
testTables.catalogName() + "')");
org.apache.hadoop.hive.metastore.api.Table hmsTable =
shell.metastore().getTable("default", "customers");
Properties tableProperties = new Properties();
@@ -514,7 +514,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
"STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(identifier) +
"TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA +
"'='WrongSchema'" +
- ",'" + InputFormatConfig.CATALOG_NAME + "'='" +
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
+ ",'" + InputFormatConfig.CATALOG_NAME + "'='" +
testTables.catalogName() + "')");
}
);
@@ -536,7 +536,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
"STORED BY ICEBERG " +
"TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "','"
+
- InputFormatConfig.CATALOG_NAME + "'='" +
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
+ InputFormatConfig.CATALOG_NAME + "'='" +
testTables.catalogName() + "')");
}
);
}
@@ -556,7 +556,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
"STORED BY ICEBERG " +
"TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "','
" +
- InputFormatConfig.CATALOG_NAME + "'='" +
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
+ InputFormatConfig.CATALOG_NAME + "'='" +
testTables.catalogName() + "')");
}
);
} else {
@@ -579,8 +579,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
"PARTITIONED BY (first_name STRING) " +
"STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(TableIdentifier.of("default",
"customers")) +
- " TBLPROPERTIES ('" + InputFormatConfig.PARTITION_SPEC + "'='" +
- PartitionSpecParser.toJson(spec) + "')");
+ testTables.propertiesForCreateTableSQL(
+ ImmutableMap.of(InputFormatConfig.PARTITION_SPEC,
PartitionSpecParser.toJson(spec))));
}
);
}
@@ -791,7 +791,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
InputFormatConfig.TABLE_SCHEMA,
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA),
InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(SPEC),
"custom_property", "initial_val",
- InputFormatConfig.CATALOG_NAME,
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME));
+ InputFormatConfig.CATALOG_NAME, testTables.catalogName()));
// Check the Iceberg table parameters
@@ -985,7 +985,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
TableIdentifier identifier = TableIdentifier.of("default", "customers");
shell.executeStatement("CREATE EXTERNAL TABLE customers (id int, name
string) STORED BY ICEBERG " +
testTables.locationForCreateTableSQL(identifier) + " TBLPROPERTIES ('"
+
- InputFormatConfig.CATALOG_NAME + "'='" +
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "', " +
+ InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() +
"', " +
"'" + TableProperties.FORMAT_VERSION + "'='2')");
org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleCatalogs.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleCatalogs.java
index 2a0e33a..f32fb10 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleCatalogs.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleCatalogs.java
@@ -118,9 +118,9 @@ public class
TestHiveIcebergStorageHandlerWithMultipleCatalogs {
@Test
public void testJoinTablesFromDifferentCatalogs() throws IOException {
- createAndAddRecords(testTables1, fileFormat1,
TableIdentifier.of("default", "customers1"), table1CatalogName,
+ createAndAddRecords(testTables1, fileFormat1,
TableIdentifier.of("default", "customers1"),
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
- createAndAddRecords(testTables2, fileFormat2,
TableIdentifier.of("default", "customers2"), table2CatalogName,
+ createAndAddRecords(testTables2, fileFormat2,
TableIdentifier.of("default", "customers2"),
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
List<Object[]> rows = shell.executeStatement("SELECT c2.customer_id,
c2.first_name, c2.last_name " +
@@ -164,14 +164,14 @@ public class
TestHiveIcebergStorageHandlerWithMultipleCatalogs {
}
private void createAndAddRecords(TestTables testTables, FileFormat
fileFormat, TableIdentifier identifier,
- String catalogName, List<Record> records)
throws IOException {
+ List<Record> records) throws IOException {
String createSql = String.format(
"CREATE EXTERNAL TABLE %s (customer_id BIGINT, first_name STRING,
last_name STRING)" +
" STORED BY ICEBERG %s " +
" TBLPROPERTIES ('%s'='%s', '%s'='%s')",
identifier,
testTables.locationForCreateTableSQL(identifier),
- InputFormatConfig.CATALOG_NAME, catalogName,
+ InputFormatConfig.CATALOG_NAME, testTables.catalogName(),
TableProperties.DEFAULT_FILE_FORMAT, fileFormat);
shell.executeStatement(createSql);
Table icebergTable = testTables.loadTable(identifier);
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index 7cc9848..5a6f38c 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
@@ -50,6 +51,7 @@ import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.MetastoreUtil;
+import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.TestCatalogs;
import org.apache.iceberg.mr.TestHelper;
@@ -98,6 +100,10 @@ abstract class TestTables {
return tables;
}
+ public String catalogName() {
+ return catalog;
+ }
+
/**
* The location string needed to be provided for CREATE TABLE ... commands,
* like "LOCATION 'file:///tmp/warehouse/default/tablename'. Empty ("") if
LOCATION is not needed.
@@ -108,8 +114,8 @@ abstract class TestTables {
/**
* The table properties string needed for the CREATE TABLE ... commands,
- * like "TBLPROPERTIES('iceberg.catalog'='mycatalog')
- * @return
+ * like {@code TBLPROPERTIES('iceberg.catalog'='mycatalog')}
+ * @return the tables properties string, such as {@code
TBLPROPERTIES('iceberg.catalog'='mycatalog')}
*/
public String propertiesForCreateTableSQL(Map<String, String>
tableProperties) {
Map<String, String> properties = new HashMap<>(tableProperties);
@@ -371,10 +377,9 @@ abstract class TestTables {
@Override
public Map<String, String> properties() {
return ImmutableMap.of(
- String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalog),
"custom",
- String.format(InputFormatConfig.CATALOG_CLASS_TEMPLATE, catalog),
+ InputFormatConfig.catalogPropertyConfigKey(catalog,
CatalogProperties.CATALOG_IMPL),
TestCatalogs.CustomHadoopCatalog.class.getName(),
- String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE,
catalog),
+ InputFormatConfig.catalogPropertyConfigKey(catalog,
CatalogProperties.WAREHOUSE_LOCATION),
warehouseLocation
);
}
@@ -403,8 +408,10 @@ abstract class TestTables {
@Override
public Map<String, String> properties() {
return ImmutableMap.of(
- String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalog),
"hadoop",
- String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE,
catalog), warehouseLocation
+ InputFormatConfig.catalogPropertyConfigKey(catalog,
CatalogUtil.ICEBERG_CATALOG_TYPE),
+ CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP,
+ InputFormatConfig.catalogPropertyConfigKey(catalog,
CatalogProperties.WAREHOUSE_LOCATION),
+ warehouseLocation
);
}
@@ -414,8 +421,8 @@ abstract class TestTables {
}
static class HadoopTestTables extends TestTables {
- HadoopTestTables(Configuration conf, TemporaryFolder temp, String
catalogName) {
- super(new HadoopTables(conf), temp, catalogName);
+ HadoopTestTables(Configuration conf, TemporaryFolder temp) {
+ super(new HadoopTables(conf), temp, Catalogs.ICEBERG_HADOOP_TABLE_NAME);
}
@Override
@@ -454,7 +461,8 @@ abstract class TestTables {
@Override
public Map<String, String> properties() {
- return
ImmutableMap.of(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE,
catalog), "hive");
+ return
ImmutableMap.of(InputFormatConfig.catalogPropertyConfigKey(catalog,
CatalogUtil.ICEBERG_CATALOG_TYPE),
+ CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
}
@Override
@@ -489,7 +497,7 @@ abstract class TestTables {
enum TestTableType {
HADOOP_TABLE {
public TestTables instance(Configuration conf, TemporaryFolder
temporaryFolder, String catalogName) {
- return new HadoopTestTables(conf, temporaryFolder, catalogName);
+ return new HadoopTestTables(conf, temporaryFolder);
}
},
HADOOP_CATALOG {
diff --git a/iceberg/patched-iceberg-core/pom.xml
b/iceberg/patched-iceberg-core/pom.xml
index c2d2d38..507818c 100644
--- a/iceberg/patched-iceberg-core/pom.xml
+++ b/iceberg/patched-iceberg-core/pom.xml
@@ -48,6 +48,11 @@
<groupId>org.apache.iceberg</groupId>
<artifactId>patched-iceberg-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <optional>true</optional>
+ </dependency>
</dependencies>
<build>
<plugins>
@@ -71,14 +76,11 @@
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<excludes>
- **/SnapshotProducer.class
- **/SnapshotSummary.class
- **/BaseMeta.class
- **/BaseMetadataTable.class
- **/StaticTableOperations.class
- **/CatalogProperties.class
- **/BaseMetastoreCatalog.class
- **/BaseMetastoreTableOperations.class
+ **/JdbcClientPool.class
+ **/JdbcUtil.class
+ **/CatalogUtil.class
+ **/ClientPool.class
+ **/ClientPoolImpl.class
</excludes>
</artifactItem>
</artifactItems>
diff --git
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/CatalogUtil.java
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/CatalogUtil.java
new file mode 100644
index 0000000..bada6fd
--- /dev/null
+++
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/CatalogUtil.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.MapMaker;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.GC_ENABLED;
+import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
+
+public class CatalogUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(CatalogUtil.class);
+
+ /**
+ * Shortcut catalog property to load a catalog implementation through a
short type name,
+ * instead of specifying a full java class through {@link
CatalogProperties#CATALOG_IMPL}.
+ * Currently the following type to implementation mappings are supported:
+ * <ul>
+ * <li>hive: org.apache.iceberg.hive.HiveCatalog</li>
+ * <li>hadoop: org.apache.iceberg.hadoop.HadoopCatalog</li>
+ * </ul>
+ */
+ public static final String ICEBERG_CATALOG_TYPE = "type";
+ public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
+ public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+ public static final String ICEBERG_CATALOG_HIVE =
"org.apache.iceberg.hive.HiveCatalog";
+ public static final String ICEBERG_CATALOG_HADOOP =
"org.apache.iceberg.hadoop.HadoopCatalog";
+
+ private CatalogUtil() {
+ }
+
+ /**
+ * Drops all data and metadata files referenced by TableMetadata.
+ * <p>
+ * This should be called by dropTable implementations to clean up table
files once the table has been dropped in the
+ * metastore.
+ *
+ * @param io a FileIO to use for deletes
+ * @param metadata the last valid TableMetadata instance for a dropped table.
+ */
+ public static void dropTableData(FileIO io, TableMetadata metadata) {
+ // Reads and deletes are done using
Tasks.foreach(...).suppressFailureWhenFinished to complete
+ // as much of the delete work as possible and avoid orphaned data or
manifest files.
+
+ Set<String> manifestListsToDelete = Sets.newHashSet();
+ Set<ManifestFile> manifestsToDelete = Sets.newHashSet();
+ for (Snapshot snapshot : metadata.snapshots()) {
+ // add all manifests to the delete set because both data and delete
files should be removed
+ Iterables.addAll(manifestsToDelete, snapshot.allManifests());
+ // add the manifest list to the delete set, if present
+ if (snapshot.manifestListLocation() != null) {
+ manifestListsToDelete.add(snapshot.manifestListLocation());
+ }
+ }
+
+ LOG.info("Manifests to delete: {}", Joiner.on(",
").join(manifestsToDelete));
+
+ // run all of the deletes
+
+ boolean gcEnabled = PropertyUtil.propertyAsBoolean(metadata.properties(),
GC_ENABLED, GC_ENABLED_DEFAULT);
+
+ if (gcEnabled) {
+ // delete data files only if we are sure this won't corrupt other tables
+ deleteFiles(io, manifestsToDelete);
+ }
+
+ Tasks.foreach(Iterables.transform(manifestsToDelete, ManifestFile::path))
+ .noRetry().suppressFailureWhenFinished()
+ .onFailure((manifest, exc) -> LOG.warn("Delete failed for manifest:
{}", manifest, exc))
+ .run(io::deleteFile);
+
+ Tasks.foreach(manifestListsToDelete)
+ .noRetry().suppressFailureWhenFinished()
+ .onFailure((list, exc) -> LOG.warn("Delete failed for manifest list:
{}", list, exc))
+ .run(io::deleteFile);
+
+ Tasks.foreach(metadata.metadataFileLocation())
+ .noRetry().suppressFailureWhenFinished()
+ .onFailure((list, exc) -> LOG.warn("Delete failed for metadata file:
{}", list, exc))
+ .run(io::deleteFile);
+ }
+
+ @SuppressWarnings("DangerousStringInternUsage")
+ private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
+ // keep track of deleted files in a map that can be cleaned up when memory
runs low
+ Map<String, Boolean> deletedFiles = new MapMaker()
+ .concurrencyLevel(ThreadPools.WORKER_THREAD_POOL_SIZE)
+ .weakKeys()
+ .makeMap();
+
+ Tasks.foreach(allManifests)
+ .noRetry().suppressFailureWhenFinished()
+ .executeWith(ThreadPools.getWorkerPool())
+ .onFailure((item, exc) -> LOG.warn("Failed to get deleted files: this
may cause orphaned data files", exc))
+ .run(manifest -> {
+ try (ManifestReader<?> reader = ManifestFiles.open(manifest, io)) {
+ for (ManifestEntry<?> entry : reader.entries()) {
+ // intern the file path because the weak key map uses identity
(==) instead of equals
+ String path = entry.file().path().toString().intern();
+ Boolean alreadyDeleted = deletedFiles.putIfAbsent(path, true);
+ if (alreadyDeleted == null || !alreadyDeleted) {
+ try {
+ io.deleteFile(path);
+ } catch (RuntimeException e) {
+ // this may happen if the map of deleted files gets cleaned
up by gc
+ LOG.warn("Delete failed for data file: {}", path, e);
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to read manifest file:
%s", manifest.path());
+ }
+ });
+ }
+
+ /**
+ * Load a custom catalog implementation.
+ * <p>
+ * The catalog must have a no-arg constructor.
+ * If the class implements {@link Configurable},
+ * a Hadoop config will be passed using {@link
Configurable#setConf(Configuration)}.
+ * {@link Catalog#initialize(String catalogName, Map options)} is called to
complete the initialization.
+ *
+ * @param impl catalog implementation full class name
+ * @param catalogName catalog name
+ * @param properties catalog properties
+ * @param hadoopConf hadoop configuration if needed
+ * @return initialized catalog object
+ * @throws IllegalArgumentException if no-arg constructor not found or error
during initialization
+ */
+ public static Catalog loadCatalog(
+ String impl,
+ String catalogName,
+ Map<String, String> properties,
+ Configuration hadoopConf) {
+ Preconditions.checkNotNull(impl, "Cannot initialize custom Catalog, impl
class name is null");
+ DynConstructors.Ctor<Catalog> ctor;
+ try {
+ ctor = DynConstructors.builder(Catalog.class).impl(impl).buildChecked();
+ } catch (NoSuchMethodException e) {
+ throw new IllegalArgumentException(String.format(
+ "Cannot initialize Catalog implementation %s: %s", impl,
e.getMessage()), e);
+ }
+
+ Catalog catalog;
+ try {
+ catalog = ctor.newInstance();
+
+ } catch (ClassCastException e) {
+ throw new IllegalArgumentException(
+ String.format("Cannot initialize Catalog, %s does not implement
Catalog.", impl), e);
+ }
+
+ if (catalog instanceof Configurable) {
+ ((Configurable) catalog).setConf(hadoopConf);
+ }
+
+ catalog.initialize(catalogName, properties);
+ return catalog;
+ }
+
+ /**
+ * Build an Iceberg {@link Catalog} based on a map of catalog properties and
optional Hadoop configuration.
+ * <p>
+ * This method examines both the {@link #ICEBERG_CATALOG_TYPE} and {@link
CatalogProperties#CATALOG_IMPL} properties
+ * to determine the catalog implementation to load.
+ * If nothing is specified for both properties, Hive catalog will be loaded
by default.
+ *
+ * @param name catalog name
+ * @param options catalog properties
+ * @param conf Hadoop configuration
+ * @return initialized catalog
+ */
+ public static Catalog buildIcebergCatalog(String name, Map<String, String>
options, Configuration conf) {
+ String catalogImpl = options.get(CatalogProperties.CATALOG_IMPL);
+ if (catalogImpl == null) {
+ String catalogType = PropertyUtil.propertyAsString(options,
ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE);
+ switch (catalogType.toLowerCase(Locale.ENGLISH)) {
+ case ICEBERG_CATALOG_TYPE_HIVE:
+ catalogImpl = ICEBERG_CATALOG_HIVE;
+ break;
+ case ICEBERG_CATALOG_TYPE_HADOOP:
+ catalogImpl = ICEBERG_CATALOG_HADOOP;
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown catalog type: " +
catalogType);
+ }
+ } else {
+ String catalogType = options.get(ICEBERG_CATALOG_TYPE);
+ Preconditions.checkArgument(catalogType == null,
+ "Cannot create catalog %s, both type and catalog-impl are set:
type=%s, catalog-impl=%s",
+ name, catalogType, catalogImpl);
+ }
+
+ return CatalogUtil.loadCatalog(catalogImpl, name, options, conf);
+ }
+
+ /**
+ * Load a custom {@link FileIO} implementation.
+ * <p>
+ * The implementation must have a no-arg constructor.
+ * If the class implements {@link Configurable},
+ * a Hadoop config will be passed using {@link
Configurable#setConf(Configuration)}.
+ * {@link FileIO#initialize(Map properties)} is called to complete the
initialization.
+ *
+ * @param impl full class name of a custom FileIO implementation
+ * @param hadoopConf hadoop configuration
+ * @return FileIO class
+ * @throws IllegalArgumentException if class path not found or
+ * right constructor not found or
+ * the loaded class cannot be casted to the given interface type
+ */
+ public static FileIO loadFileIO(
+ String impl,
+ Map<String, String> properties,
+ Configuration hadoopConf) {
+ LOG.info("Loading custom FileIO implementation: {}", impl);
+ DynConstructors.Ctor<FileIO> ctor;
+ try {
+ ctor = DynConstructors.builder(FileIO.class).impl(impl).buildChecked();
+ } catch (NoSuchMethodException e) {
+ throw new IllegalArgumentException(String.format(
+ "Cannot initialize FileIO, missing no-arg constructor: %s", impl),
e);
+ }
+
+ FileIO fileIO;
+ try {
+ fileIO = ctor.newInstance();
+ } catch (ClassCastException e) {
+ throw new IllegalArgumentException(
+ String.format("Cannot initialize FileIO, %s does not implement
FileIO.", impl), e);
+ }
+
+ if (fileIO instanceof Configurable) {
+ ((Configurable) fileIO).setConf(hadoopConf);
+ }
+
+ fileIO.initialize(properties);
+ return fileIO;
+ }
+}
diff --git
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/ClientPool.java
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ClientPool.java
similarity index 90%
rename from
iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/ClientPool.java
rename to
iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ClientPool.java
index 7f32cf9..117939a 100644
---
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/ClientPool.java
+++
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ClientPool.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iceberg.hive;
+package org.apache.iceberg;
public interface ClientPool<C, E extends Exception> {
interface Action<R, C, E extends Exception> {
@@ -25,4 +25,6 @@ public interface ClientPool<C, E extends Exception> {
}
<R> R run(Action<R, C, E> action) throws E, InterruptedException;
+
+ <R> R run(Action<R, C, E> action, boolean retry) throws E,
InterruptedException;
}
diff --git
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/ClientPoolImpl.java
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
similarity index 90%
rename from
iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/ClientPoolImpl.java
rename to
iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
index d1a44d3..18f7afa 100644
---
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/ClientPoolImpl.java
+++
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
@@ -17,12 +17,11 @@
* under the License.
*/
-package org.apache.iceberg.hive;
+package org.apache.iceberg;
import java.io.Closeable;
import java.util.ArrayDeque;
import java.util.Deque;
-import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,25 +33,32 @@ public abstract class ClientPoolImpl<C, E extends
Exception> implements Closeabl
private final Deque<C> clients;
private final Class<? extends E> reconnectExc;
private final Object signal = new Object();
+ private final boolean retryByDefault;
private volatile int currentSize;
private boolean closed;
- ClientPoolImpl(int poolSize, Class<? extends E> reconnectExc) {
+ public ClientPoolImpl(int poolSize, Class<? extends E> reconnectExc, boolean
retryByDefault) {
this.poolSize = poolSize;
this.reconnectExc = reconnectExc;
this.clients = new ArrayDeque<>(poolSize);
this.currentSize = 0;
this.closed = false;
+ this.retryByDefault = retryByDefault;
}
@Override
public <R> R run(Action<R, C, E> action) throws E, InterruptedException {
+ return run(action, retryByDefault);
+ }
+
+ @Override
+ public <R> R run(Action<R, C, E> action, boolean retry) throws E,
InterruptedException {
C client = get();
try {
return action.run(client);
} catch (Exception exc) {
- if (isConnectionException(exc)) {
+ if (retry && isConnectionException(exc)) {
try {
client = reconnect(client);
} catch (Exception ignored) {
@@ -138,8 +144,7 @@ public abstract class ClientPoolImpl<C, E extends
Exception> implements Closeabl
}
}
- @VisibleForTesting
- int poolSize() {
+ public int poolSize() {
return poolSize;
}
}
diff --git
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java
new file mode 100644
index 0000000..ba5edf5
--- /dev/null
+++
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.SQLNonTransientConnectionException;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.ClientPoolImpl;
+
+class JdbcClientPool extends ClientPoolImpl<Connection, SQLException> {
+
+ private final String dbUrl;
+ private final Map<String, String> properties;
+
+ JdbcClientPool(String dbUrl, Map<String, String> props) {
+
this(Integer.parseInt(props.getOrDefault(CatalogProperties.CLIENT_POOL_SIZE,
+ String.valueOf(CatalogProperties.CLIENT_POOL_SIZE_DEFAULT))), dbUrl,
props);
+ }
+
+ JdbcClientPool(int poolSize, String dbUrl, Map<String, String> props) {
+ super(poolSize, SQLNonTransientConnectionException.class, true);
+ properties = props;
+ this.dbUrl = dbUrl;
+ }
+
+ @Override
+ protected Connection newClient() {
+ try {
+ Properties dbProps = JdbcUtil.filterAndRemovePrefix(properties,
JdbcCatalog.PROPERTY_PREFIX);
+ return DriverManager.getConnection(dbUrl, dbProps);
+ } catch (SQLException e) {
+ throw new UncheckedSQLException(e, "Failed to connect: %s", dbUrl);
+ }
+ }
+
+ @Override
+ protected Connection reconnect(Connection client) {
+ close(client);
+ return newClient();
+ }
+
+ @Override
+ protected void close(Connection client) {
+ try {
+ client.close();
+ } catch (SQLException e) {
+ throw new UncheckedSQLException(e, "Failed to close connection");
+ }
+ }
+}
diff --git
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
new file mode 100644
index 0000000..0b55839
--- /dev/null
+++
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.jdbc;
+
+import java.util.Map;
+import java.util.Properties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+
+final class JdbcUtil {
+ protected static final String CATALOG_TABLE_NAME = "iceberg_tables";
+ protected static final String CATALOG_NAME = "catalog_name";
+ protected static final String TABLE_NAMESPACE = "table_namespace";
+ protected static final String TABLE_NAME = "table_name";
+ protected static final String METADATA_LOCATION = "metadata_location";
+ protected static final String PREVIOUS_METADATA_LOCATION =
"previous_metadata_location";
+ public static final String DO_COMMIT_SQL = "UPDATE " + CATALOG_TABLE_NAME +
+ " SET " + METADATA_LOCATION + " = ? , " + PREVIOUS_METADATA_LOCATION + "
= ? " +
+ " WHERE " + CATALOG_NAME + " = ? AND " +
+ TABLE_NAMESPACE + " = ? AND " +
+ TABLE_NAME + " = ? AND " +
+ METADATA_LOCATION + " = ?";
+ protected static final String CREATE_CATALOG_TABLE =
+ "CREATE TABLE " + CATALOG_TABLE_NAME +
+ "(" +
+ CATALOG_NAME + " VARCHAR(255) NOT NULL," +
+ TABLE_NAMESPACE + " VARCHAR(255) NOT NULL," +
+ TABLE_NAME + " VARCHAR(255) NOT NULL," +
+ METADATA_LOCATION + " VARCHAR(5500)," +
+ PREVIOUS_METADATA_LOCATION + " VARCHAR(5500)," +
+ "PRIMARY KEY (" + CATALOG_NAME + ", " + TABLE_NAMESPACE + ", " +
TABLE_NAME + ")" +
+ ")";
+ protected static final String GET_TABLE_SQL = "SELECT * FROM " +
CATALOG_TABLE_NAME +
+ " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " = ? AND " +
TABLE_NAME + " = ? ";
+ protected static final String LIST_TABLES_SQL = "SELECT * FROM " +
CATALOG_TABLE_NAME +
+ " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " = ?";
+ protected static final String RENAME_TABLE_SQL = "UPDATE " +
CATALOG_TABLE_NAME +
+ " SET " + TABLE_NAMESPACE + " = ? , " + TABLE_NAME + " = ? " +
+ " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " = ? AND " +
TABLE_NAME + " = ? ";
+ protected static final String DROP_TABLE_SQL = "DELETE FROM " +
CATALOG_TABLE_NAME +
+ " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " = ? AND " +
TABLE_NAME + " = ? ";
+ protected static final String GET_NAMESPACE_SQL = "SELECT " +
TABLE_NAMESPACE + " FROM " + CATALOG_TABLE_NAME +
+ " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " LIKE ?
LIMIT 1";
+ protected static final String LIST_NAMESPACES_SQL = "SELECT DISTINCT " +
TABLE_NAMESPACE +
+ " FROM " + CATALOG_TABLE_NAME +
+ " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " LIKE ?";
+ protected static final String DO_COMMIT_CREATE_TABLE_SQL = "INSERT INTO " +
CATALOG_TABLE_NAME +
+ " (" + CATALOG_NAME + ", " + TABLE_NAMESPACE + ", " + TABLE_NAME +
+ ", " + METADATA_LOCATION + ", " + PREVIOUS_METADATA_LOCATION + ") " +
+ " VALUES (?,?,?,?,null)";
+ private static final Joiner JOINER_DOT = Joiner.on('.');
+ private static final Splitter SPLITTER_DOT = Splitter.on('.');
+
+ private JdbcUtil() {
+ }
+
+ public static Namespace stringToNamespace(String namespace) {
+ Preconditions.checkArgument(namespace != null, "Invalid namespace %s",
namespace);
+ return Namespace.of(Iterables.toArray(SPLITTER_DOT.split(namespace),
String.class));
+ }
+
+ public static String namespaceToString(Namespace namespace) {
+ return JOINER_DOT.join(namespace.levels());
+ }
+
+ public static TableIdentifier stringToTableIdentifier(String tableNamespace,
String tableName) {
+ return TableIdentifier.of(JdbcUtil.stringToNamespace(tableNamespace),
tableName);
+ }
+
+ public static Properties filterAndRemovePrefix(Map<String, String>
properties,
+ String prefix) {
+ Properties result = new Properties();
+ properties.forEach((key, value) -> {
+ if (key.startsWith(prefix)) {
+ result.put(key.substring(prefix.length()), value);
+ }
+ });
+
+ return result;
+ }
+}
diff --git a/iceberg/pom.xml b/iceberg/pom.xml
index 4a2f2f5..547baaf 100644
--- a/iceberg/pom.xml
+++ b/iceberg/pom.xml
@@ -38,6 +38,8 @@
<spotless.maven.plugin.version>2.5.0</spotless.maven.plugin.version>
<google.errorprone.javac.version>9+181-r4173-1</google.errorprone.javac.version>
<google.errorprone.version>2.5.1</google.errorprone.version>
+ <assertj.version>3.19.0</assertj.version>
+ <junit.jupiter.version>5.7.2</junit.jupiter.version>
</properties>
<modules>
@@ -190,6 +192,17 @@
<classifier>tests</classifier>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>${assertj.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <version>${junit.jupiter.version}</version>
+ </dependency>
+
<dependency>
<groupId>com.esotericsoftware</groupId>