This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 1e89a3b Use Hive 1.2.1 and add a client pool (#166)
1e89a3b is described below
commit 1e89a3bb11644241bdc7b75b72570c01e96c40c7
Author: Ryan Blue <[email protected]>
AuthorDate: Thu May 2 11:35:46 2019 -0700
Use Hive 1.2.1 and add a client pool (#166)
---
build.gradle | 10 +-
.../java/org/apache/iceberg/hive/ClientPool.java | 139 +++++++++++++++++++++
.../org/apache/iceberg/hive/HiveClientPool.java | 63 ++++++++++
.../apache/iceberg/hive/HiveTableOperations.java | 54 ++++----
.../java/org/apache/iceberg/hive/HiveTables.java | 37 ++----
.../apache/iceberg/hive/RuntimeMetaException.java | 35 ++++++
.../org/apache/iceberg/hive/HiveTableBaseTest.java | 135 +++++++++-----------
.../org/apache/iceberg/hive/HiveTablesTest.java | 43 +++----
.../src/test/resources/hive-schema-3.1.0.derby.sql | 2 +-
9 files changed, 371 insertions(+), 147 deletions(-)
diff --git a/build.gradle b/build.gradle
index 6ac2381..ae8920e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -78,7 +78,7 @@ subprojects {
avroVersion = '1.8.2'
orcVersion = '1.4.2'
parquetVersion = '1.10.0'
- hiveVersion = '3.1.0'
+ hiveVersion = '1.2.1'
jacksonVersion = '2.6.7'
@@ -97,6 +97,7 @@ subprojects {
testCompile 'org.slf4j:slf4j-simple:1.7.5'
testCompile 'org.mockito:mockito-all:1.10.19'
}
+
publishing {
publications {
nebula(MavenPublication) {
@@ -207,12 +208,15 @@ project(':iceberg-hive') {
compileOnly "org.apache.avro:avro:$avroVersion"
- compileOnly("org.apache.hive:hive-standalone-metastore:$hiveVersion") {
+ compileOnly("org.apache.hive:hive-metastore:$hiveVersion") {
exclude group: 'org.apache.avro', module: 'avro'
+ exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
testCompile("org.apache.hive:hive-exec:$hiveVersion") {
exclude group: 'org.apache.avro', module: 'avro'
+ exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+ exclude group: 'org.pentaho' // missing dependency
}
compileOnly("org.apache.hadoop:hadoop-client:$hadoopVersion") {
@@ -368,7 +372,7 @@ project(':iceberg-presto-runtime') {
shadow "org.apache.parquet:parquet-avro:$parquetVersion"
shadow "org.apache.avro:avro:$avroVersion"
- shadow ("org.apache.hive:hive-standalone-metastore:$hiveVersion") {
+ shadow ("org.apache.hive:hive-metastore:$hiveVersion") {
exclude group: 'org.apache.hadoop', module: 'hadoop-common'
// exclude group: 'org.apache.orc', module: 'orc-core'
}
diff --git a/hive/src/main/java/org/apache/iceberg/hive/ClientPool.java
b/hive/src/main/java/org/apache/iceberg/hive/ClientPool.java
new file mode 100644
index 0000000..1baffb2
--- /dev/null
+++ b/hive/src/main/java/org/apache/iceberg/hive/ClientPool.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+abstract class ClientPool<C, E extends Exception> implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(ClientPool.class);
+
+ private final int poolSize;
+ private final Deque<C> clients;
+ private final Class<E> reconnectExc;
+ private final Object signal = new Object();
+ private volatile int currentSize;
+ private boolean closed;
+ private int runs = 0;
+
+ ClientPool(int poolSize, Class<E> reconnectExc) {
+ this.poolSize = poolSize;
+ this.reconnectExc = reconnectExc;
+ this.clients = new ArrayDeque<>(poolSize);
+ this.currentSize = 0;
+ this.closed = false;
+ }
+
+ interface Action<R, C, E extends Exception> {
+ R run(C client) throws E;
+ }
+
+ public <R> R run(Action<R, C, E> action) throws E, InterruptedException {
+ runs += 1;
+ C client = get();
+ try {
+ return action.run(client);
+
+ } catch (Exception exc) {
+ if (reconnectExc.isInstance(exc)) {
+ try {
+ client = reconnect(client);
+ } catch (Exception ignored) {
+ // if reconnection throws any exception, rethrow the original failure
+ throw reconnectExc.cast(exc);
+ }
+
+ return action.run(client);
+ }
+
+ throw exc;
+
+ } finally {
+ release(client);
+ }
+ }
+
+ protected abstract C newClient();
+
+ protected abstract C reconnect(C client);
+
+ protected abstract void close(C client);
+
+ private C get() throws InterruptedException {
+ Preconditions.checkState(!closed, "Cannot get a client from a closed
pool");
+ while (true) {
+ if (!clients.isEmpty() || currentSize < poolSize) {
+ synchronized (this) {
+ if (!clients.isEmpty()) {
+ return clients.removeFirst();
+ } else if (currentSize < poolSize) {
+ currentSize += 1;
+ return newClient();
+ }
+ }
+ }
+ synchronized (signal) {
+ // wake every second in case this missed the signal
+ signal.wait(1000);
+ }
+ }
+ }
+
+ private void release(C client) {
+ synchronized (this) {
+ clients.addFirst(client);
+ }
+ synchronized (signal) {
+ signal.notify();
+ }
+ }
+
+ @Override
+ public void close() {
+ this.closed = true;
+ try {
+ while (currentSize > 0) {
+ if (!clients.isEmpty()) {
+ synchronized (this) {
+ if (!clients.isEmpty()) {
+ C client = clients.removeFirst();
+ close(client);
+ currentSize -= 1;
+ }
+ }
+ }
+ if (clients.isEmpty() && currentSize > 0) {
+ // wake every second in case this missed the signal
+ synchronized (signal) {
+ signal.wait(1000);
+ }
+ }
+ }
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted while shutting down pool. Some clients may not be
closed.");
+ }
+ }
+}
diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
b/hive/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
new file mode 100644
index 0000000..8527555
--- /dev/null
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.thrift.TException;
+
+class HiveClientPool extends ClientPool<HiveMetaStoreClient, TException> {
+ private final HiveConf hiveConf;
+
+ HiveClientPool(Configuration conf) {
+ this(conf.getInt("iceberg.hive.client-pool-size", 5), conf);
+ }
+
+ HiveClientPool(int poolSize, Configuration conf) {
+ super(poolSize, TException.class);
+ this.hiveConf = new HiveConf(conf, HiveClientPool.class);
+ }
+
+ @Override
+ protected HiveMetaStoreClient newClient() {
+ try {
+ return new HiveMetaStoreClient(hiveConf);
+ } catch (MetaException e) {
+ throw new RuntimeMetaException(e, "Failed to connect to Hive Metastore");
+ }
+ }
+
+ @Override
+ protected HiveMetaStoreClient reconnect(HiveMetaStoreClient client) {
+ try {
+ client.reconnect();
+ } catch (MetaException e) {
+ throw new RuntimeMetaException(e, "Failed to reconnect to Hive
Metastore");
+ }
+ return client;
+ }
+
+ @Override
+ protected void close(HiveMetaStoreClient client) {
+ client.close();
+ }
+}
diff --git
a/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
b/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index c157756..fa7f1df 100644
--- a/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -27,7 +27,6 @@ import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.LockComponent;
import org.apache.hadoop.hive.metastore.api.LockLevel;
@@ -37,11 +36,8 @@ import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.SerdeType;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
-import org.apache.hadoop.hive.metastore.api.UnlockRequest;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;
@@ -56,20 +52,17 @@ import static java.lang.String.format;
/**
* TODO we should be able to extract some more commonalities to
BaseMetastoreTableOperations to
* avoid code duplication between this class and Metacat Tables.
- *
- * Note! This class is not thread-safe as {@link ThriftHiveMetastore.Client}
does not behave
- * correctly in a multi-threaded environment.
*/
public class HiveTableOperations extends BaseMetastoreTableOperations {
private static final Logger LOG =
LoggerFactory.getLogger(HiveTableOperations.class);
- private final ThriftHiveMetastore.Client metaStoreClient;
+ private final HiveClientPool metaClients;
private final String database;
private final String tableName;
- protected HiveTableOperations(Configuration conf, ThriftHiveMetastore.Client
metaStoreClient, String database, String table) {
+ protected HiveTableOperations(Configuration conf, HiveClientPool
metaClients, String database, String table) {
super(conf);
- this.metaStoreClient = metaStoreClient;
+ this.metaClients = metaClients;
this.database = database;
this.tableName = table;
}
@@ -78,7 +71,7 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
public TableMetadata refresh() {
String metadataLocation = null;
try {
- final Table table = metaStoreClient.get_table(database, tableName);
+ final Table table = metaClients.run(client -> client.getTable(database,
tableName));
String tableType = table.getParameters().get(TABLE_TYPE_PROP);
if (tableType == null ||
!tableType.equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE)) {
@@ -96,7 +89,11 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
}
} catch (TException e) {
- throw new RuntimeException(format("Failed to get table info from
metastore %s.%s", database, tableName));
+ throw new RuntimeException(format("Failed to get table info from
metastore %s.%s", database, tableName), e);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted during refresh", e);
}
refreshFromMetadataLocation(metadataLocation);
@@ -108,7 +105,7 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
public void commit(TableMetadata base, TableMetadata metadata) {
// if the metadata is already out of date, reject it
if (base != current()) {
- throw new CommitFailedException(format("stale table metadata for %s.%s",
database, tableName));
+ throw new CommitFailedException("Cannot commit: stale table metadata for
%s.%s", database, tableName);
}
// if the metadata is not changed, return early
@@ -126,7 +123,7 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
// TODO add lock heart beating for cases where default lock timeout is
too low.
Table tbl;
if (base != null) {
- tbl = metaStoreClient.get_table(database, tableName);
+ tbl = metaClients.run(client -> client.getTable(database, tableName));
} else {
final long currentTimeMillis = System.currentTimeMillis();
tbl = new Table(tableName,
@@ -153,13 +150,24 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
setParameters(newMetadataLocation, tbl);
if (base != null) {
- metaStoreClient.alter_table(database, tableName, tbl);
+ metaClients.run(client -> {
+ client.alter_table(database, tableName, tbl);
+ return null;
+ });
} else {
- metaStoreClient.create_table(tbl);
+ metaClients.run(client -> {
+ client.createTable(tbl);
+ return null;
+ });
}
threw = false;
} catch (TException | UnknownHostException e) {
throw new RuntimeException(format("Metastore operation failed for
%s.%s", database, tableName), e);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted during commit", e);
+
} finally {
if (threw) {
// if anything went wrong, clean up the uncommitted metadata file
@@ -196,7 +204,6 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
storageDescriptor.setOutputFormat("org.apache.hadoop.mapred.FileInputFormat");
storageDescriptor.setInputFormat("org.apache.hadoop.mapred.FileOutputFormat");
SerDeInfo serDeInfo = new SerDeInfo();
- serDeInfo.setSerdeType(SerdeType.HIVE);
serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe");
storageDescriptor.setSerdeInfo(serDeInfo);
return storageDescriptor;
@@ -206,18 +213,18 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
return schema.columns().stream().map(col -> new FieldSchema(col.name(),
HiveTypeConverter.convert(col.type()), "")).collect(Collectors.toList());
}
- private long acquireLock() throws UnknownHostException, TException {
+ private long acquireLock() throws UnknownHostException, TException,
InterruptedException {
final LockComponent lockComponent = new LockComponent(LockType.EXCLUSIVE,
LockLevel.TABLE, database);
lockComponent.setTablename(tableName);
final LockRequest lockRequest = new
LockRequest(Lists.newArrayList(lockComponent),
System.getProperty("user.name"),
InetAddress.getLocalHost().getHostName());
- LockResponse lockResponse = metaStoreClient.lock(lockRequest);
+ LockResponse lockResponse = metaClients.run(client ->
client.lock(lockRequest));
LockState state = lockResponse.getState();
long lockId = lockResponse.getLockid();
//TODO add timeout
while (state.equals(LockState.WAITING)) {
- lockResponse = metaStoreClient.check_lock(new
CheckLockRequest(lockResponse.getLockid()));
+ lockResponse = metaClients.run(client -> client.checkLock(lockId));
state = lockResponse.getState();
}
@@ -231,8 +238,11 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations {
private void unlock(Optional<Long> lockId) {
if (lockId.isPresent()) {
try {
- metaStoreClient.unlock(new UnlockRequest(lockId.get()));
- } catch (TException e) {
+ metaClients.run(client -> {
+ client.unlock(lockId.get());
+ return null;
+ });
+ } catch (Exception e) {
throw new RuntimeException(format("Failed to unlock %s.%s", database,
tableName) , e);
}
}
diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveTables.java
b/hive/src/main/java/org/apache/iceberg/hive/HiveTables.java
index ea3d32f..7332264 100644
--- a/hive/src/main/java/org/apache/iceberg/hive/HiveTables.java
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveTables.java
@@ -16,33 +16,23 @@
package org.apache.iceberg.hive;
import com.google.common.base.Splitter;
-import java.net.URI;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.BaseMetastoreTables;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-
-import static
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.CLIENT_SOCKET_TIMEOUT;
-import static
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.THRIFT_URIS;
+import java.io.Closeable;
+import java.util.List;
+import java.util.Map;
-public class HiveTables extends BaseMetastoreTables {
+public class HiveTables extends BaseMetastoreTables implements Closeable {
private static final Splitter DOT = Splitter.on('.').limit(2);
- private Configuration conf;
+ private final HiveClientPool clients;
public HiveTables(Configuration conf) {
super(conf);
- this.conf = conf;
+ this.clients = new HiveClientPool(2, conf);
}
@Override
@@ -70,18 +60,11 @@ public class HiveTables extends BaseMetastoreTables {
@Override
public BaseMetastoreTableOperations newTableOps(Configuration conf, String
database, String table) {
- return new HiveTableOperations(conf, getClient(), database, table);
+ return new HiveTableOperations(conf, clients, database, table);
}
- private ThriftHiveMetastore.Client getClient() {
- final URI metastoreUri = URI.create(MetastoreConf.getAsString(conf,
THRIFT_URIS));
- final int socketTimeOut = (int) MetastoreConf.getTimeVar(conf,
CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);
- TTransport transport = new TSocket(metastoreUri.getHost(),
metastoreUri.getPort(), socketTimeOut);
- try {
- transport.open();
- } catch (TTransportException e) {
- throw new RuntimeException("failed to open socket for " + metastoreUri +
" with timeoutMillis " + socketTimeOut);
- }
- return new ThriftHiveMetastore.Client(new TBinaryProtocol(transport));
+ @Override
+ public void close() {
+ clients.close();
}
}
diff --git
a/hive/src/main/java/org/apache/iceberg/hive/RuntimeMetaException.java
b/hive/src/main/java/org/apache/iceberg/hive/RuntimeMetaException.java
new file mode 100644
index 0000000..94e3e7b
--- /dev/null
+++ b/hive/src/main/java/org/apache/iceberg/hive/RuntimeMetaException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import org.apache.hadoop.hive.metastore.api.MetaException;
+
+/**
+ * Exception used to wrap {@link MetaException} as a {@link RuntimeException}
and add context.
+ */
+public class RuntimeMetaException extends RuntimeException {
+ public RuntimeMetaException(MetaException cause) {
+ super(cause);
+ }
+
+ public RuntimeMetaException(MetaException cause, String message, Object...
args) {
+ super(String.format(message, args), cause);
+ }
+}
diff --git a/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
b/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
index 4b6fc15..0f43a4f 100644
--- a/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
+++ b/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
@@ -20,7 +20,6 @@ import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
-import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.nio.file.Paths;
import java.sql.Connection;
@@ -41,36 +40,30 @@ import org.apache.hadoop.hive.metastore.IHMSHandler;
import org.apache.hadoop.hive.metastore.RetryingHMSHandler;
import org.apache.hadoop.hive.metastore.TServerSocketKeepAlive;
import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
-import org.apache.hadoop.hive.metastore.api.Catalog;
import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
-import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import static java.nio.file.Files.createTempDirectory;
import static java.nio.file.attribute.PosixFilePermissions.asFileAttribute;
import static java.nio.file.attribute.PosixFilePermissions.fromString;
-import static
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.CONNECT_URL_KEY;
-import static
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.THRIFT_URIS;
-import static
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.WAREHOUSE;
import static org.apache.iceberg.PartitionSpec.builderFor;
import static org.apache.iceberg.TableMetadataParser.getFileExtension;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
-class HiveTableBaseTest {
+public class HiveTableBaseTest {
static final String DB_NAME = "hivedb";
static final String TABLE_NAME = "tbl";
@@ -84,37 +77,34 @@ class HiveTableBaseTest {
private static final PartitionSpec partitionSpec =
builderFor(schema).identity("id").build();
- Configuration hiveConf;
- HiveMetaStoreClient metastoreClient;
- private File hiveLocalDir;
+ private static HiveConf hiveConf;
+ private static File hiveLocalDir;
- private ExecutorService executorService;
- private TServer server;
+ private static ExecutorService executorService;
+ private static TServer server;
- @Before
- public void setup() throws IOException,
- TException,
- InvocationTargetException,
- NoSuchMethodException,
- IllegalAccessException,
- NoSuchFieldException, SQLException {
- this.executorService = Executors.newSingleThreadExecutor();
- hiveLocalDir = createTempDirectory("hive",
asFileAttribute(fromString("rwxrwxrwx"))).toFile();
+ static HiveMetaStoreClient metastoreClient;
+
+ @BeforeClass
+ public static void startMetastore() throws Exception {
+ HiveTableBaseTest.executorService = Executors.newSingleThreadExecutor();
+ HiveTableBaseTest.hiveLocalDir = createTempDirectory("hive",
asFileAttribute(fromString("rwxrwxrwx"))).toFile();
File derbyLogFile = new File(hiveLocalDir, "derby.log");
System.setProperty("derby.stream.error.file",
derbyLogFile.getAbsolutePath());
setupDB("jdbc:derby:" + getDerbyPath() + ";create=true");
- this.server = thriftServer();
+ HiveTableBaseTest.server = thriftServer();
executorService.submit(() -> server.serve());
- this.metastoreClient = new HiveMetaStoreClient(this.hiveConf);
- createIfNotExistsCatalog("hive");
- this.metastoreClient.createDatabase(new Database(DB_NAME, "description",
getDBPath(), new HashMap<>()));
- new HiveTables(this.hiveConf).create(schema, partitionSpec, DB_NAME,
TABLE_NAME);
+ HiveTableBaseTest.metastoreClient = new HiveMetaStoreClient(hiveConf);
+ metastoreClient.createDatabase(new Database(DB_NAME, "description",
getDBPath(), new HashMap<>()));
}
- @After
- public void cleanup() {
+ @AfterClass
+ public static void stopMetastore() {
+ metastoreClient.close();
+ HiveTableBaseTest.metastoreClient = null;
+
if (server != null) {
server.stop();
}
@@ -126,37 +116,44 @@ class HiveTableBaseTest {
}
}
- private HiveConf hiveConf(Configuration conf, int port) {
- final HiveConf hiveConf = new HiveConf(conf, this.getClass());
- hiveConf.set(THRIFT_URIS.getVarname(), "thrift://localhost:" + port);
- hiveConf.set(WAREHOUSE.getVarname(), "file:" +
hiveLocalDir.getAbsolutePath());
- hiveConf.set(WAREHOUSE.getHiveName(), "file:" +
hiveLocalDir.getAbsolutePath());
- hiveConf.set(CONNECT_URL_KEY.getVarname(), "jdbc:derby:" + getDerbyPath()
+ ";create=true");
+ HiveTables tables;
+
+ @Before
+ public void createTestTable() throws Exception {
+ this.tables = new HiveTables(hiveConf);
+ tables.create(schema, partitionSpec, DB_NAME, TABLE_NAME);
+ }
+
+ @After
+ public void dropTestTable() throws Exception {
+ metastoreClient.dropTable(DB_NAME, TABLE_NAME);
+ tables.close();
+ this.tables = null;
+ }
+
+ private static HiveConf hiveConf(Configuration conf, int port) {
+ final HiveConf hiveConf = new HiveConf(conf, HiveTableBaseTest.class);
+ hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname,
"thrift://localhost:" + port);
+ hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "file:" +
hiveLocalDir.getAbsolutePath());
return hiveConf;
}
- private String getDerbyPath() {
+ private static String getDerbyPath() {
final File metastore_db = new File(hiveLocalDir, "metastore_db");
return metastore_db.getPath();
}
- private TServer thriftServer() throws IOException,
- TTransportException,
- MetaException,
- InvocationTargetException,
- NoSuchMethodException,
- IllegalAccessException,
- NoSuchFieldException {
- final TServerSocketKeepAlive socket = new TServerSocketKeepAlive(new
TServerSocket(0));
- this.hiveConf = hiveConf(new Configuration(),
socket.getServerSocket().getLocalPort());
- HiveMetaStore.HMSHandler baseHandler = new HiveMetaStore.HMSHandler("new
db based metaserver", hiveConf);
- IHMSHandler handler = RetryingHMSHandler.getProxy(hiveConf, baseHandler,
true);
- final TTransportFactory transportFactory = new TTransportFactory();
- final TSetIpAddressProcessor<IHMSHandler> processor = new
TSetIpAddressProcessor<>(handler);
+ private static TServer thriftServer() throws Exception {
+ TServerSocket socket = new TServerSocket(0);
+ HiveTableBaseTest.hiveConf = hiveConf(new Configuration(),
socket.getServerSocket().getLocalPort());
+ HiveConf serverConf = new HiveConf(hiveConf);
+ serverConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,
"jdbc:derby:" + getDerbyPath() + ";create=true");
+ HiveMetaStore.HMSHandler baseHandler = new HiveMetaStore.HMSHandler("new
db based metaserver", serverConf);
+ IHMSHandler handler = RetryingHMSHandler.getProxy(serverConf, baseHandler,
false);
TThreadPoolServer.Args args = new TThreadPoolServer.Args(socket)
- .processor(processor)
- .transportFactory(transportFactory)
+ .processor(new TSetIpAddressProcessor<>(handler))
+ .transportFactory(new TTransportFactory())
.protocolFactory(new TBinaryProtocol.Factory())
.minWorkerThreads(3)
.maxWorkerThreads(5);
@@ -164,55 +161,47 @@ class HiveTableBaseTest {
return new TThreadPoolServer(args);
}
- private void setupDB(String dbURL) throws SQLException, IOException {
+ private static void setupDB(String dbURL) throws SQLException, IOException {
Connection connection = DriverManager.getConnection(dbURL);
ScriptRunner scriptRunner = new ScriptRunner(connection, true, true);
- URL hiveSqlScript =
getClass().getClassLoader().getResource("hive-schema-3.1.0.derby.sql");
- Reader reader = new BufferedReader(new FileReader(new
File(hiveSqlScript.getFile())));
- scriptRunner.runScript(reader);
- }
-
- private void createIfNotExistsCatalog(String catalogName) throws TException {
- try {
- metastoreClient.getCatalog(catalogName);
- } catch(NoSuchObjectException e) {
- String catalogPath = Paths.get(hiveLocalDir.getAbsolutePath(),
catalogName + ".catalog").toString();
- metastoreClient.createCatalog(new Catalog(catalogName, catalogPath));
+ URL hiveSqlScript =
HiveTableBaseTest.class.getClassLoader().getResource("hive-schema-3.1.0.derby.sql");
+ try (Reader reader = new BufferedReader(new FileReader(new
File(hiveSqlScript.getFile())))) {
+ scriptRunner.runScript(reader);
}
}
- private String getDBPath() {
+ private static String getDBPath() {
return Paths.get(hiveLocalDir.getAbsolutePath(), DB_NAME +
".db").toAbsolutePath().toString();
}
- String getTableBasePath(String tableName) {
+ private static String getTableBasePath(String tableName) {
return Paths.get(getDBPath(), tableName).toAbsolutePath().toString();
}
- String getTableLocation(String tableName) {
+ protected static String getTableLocation(String tableName) {
return new Path("file", null,
Paths.get(getTableBasePath(tableName)).toString()).toString();
}
- String metadataLocation(String tableName) {
+ private static String metadataLocation(String tableName) {
return Paths.get(getTableBasePath(tableName), "metadata").toString();
}
- private List<String> metadataFiles(String tableName) {
+ private static List<String> metadataFiles(String tableName) {
return Arrays.stream(new File(metadataLocation(tableName)).listFiles())
.map(File::getAbsolutePath)
.collect(Collectors.toList());
}
- List<String> metadataVersionFiles(String tableName) {
+ protected static List<String> metadataVersionFiles(String tableName) {
return filterByExtension(tableName, getFileExtension(hiveConf));
}
- List<String> manifestFiles(String tableName) {
+ protected static List<String> manifestFiles(String tableName) {
return filterByExtension(tableName, ".avro");
}
- private List<String> filterByExtension(String tableName, String extension) {
+ private static List<String> filterByExtension(String tableName, String
extension) {
return metadataFiles(tableName)
.stream()
.filter(f -> f.endsWith(extension))
diff --git a/hive/src/test/java/org/apache/iceberg/hive/HiveTablesTest.java
b/hive/src/test/java/org/apache/iceberg/hive/HiveTablesTest.java
index 222565c..607e060 100644
--- a/hive/src/test/java/org/apache/iceberg/hive/HiveTablesTest.java
+++ b/hive/src/test/java/org/apache/iceberg/hive/HiveTablesTest.java
@@ -22,12 +22,12 @@ import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Tasks;
@@ -43,7 +43,7 @@ public class HiveTablesTest extends HiveTableBaseTest {
@Test
public void testCreate() throws TException {
// Table should be created in hive metastore
- final Table table = metastoreClient.getTable(DB_NAME, TABLE_NAME);
+ final org.apache.hadoop.hive.metastore.api.Table table =
metastoreClient.getTable(DB_NAME, TABLE_NAME);
// check parameters are in expected state
final Map<String, String> parameters = table.getParameters();
@@ -61,25 +61,25 @@ public class HiveTablesTest extends HiveTableBaseTest {
Assert.assertEquals(1, metadataVersionFiles(TABLE_NAME).size());
Assert.assertEquals(0, manifestFiles(TABLE_NAME).size());
- final org.apache.iceberg.Table icebergTable = new
HiveTables(hiveConf).load(DB_NAME, TABLE_NAME);
+ final Table icebergTable = tables.load(DB_NAME, TABLE_NAME);
// Iceberg schema should match the loaded table
Assert.assertEquals(schema.asStruct(), icebergTable.schema().asStruct());
}
@Test
public void testExistingTableUpdate() throws TException {
- org.apache.iceberg.Table icebergTable = new
HiveTables(hiveConf).load(DB_NAME, TABLE_NAME);
+ Table icebergTable = tables.load(DB_NAME, TABLE_NAME);
// add a column
icebergTable.updateSchema().addColumn("data",
Types.LongType.get()).commit();
- icebergTable = new HiveTables(hiveConf).load(DB_NAME, TABLE_NAME);
+ icebergTable = tables.load(DB_NAME, TABLE_NAME);
// Only 2 snapshotFile Should exist and no manifests should exist
Assert.assertEquals(2, metadataVersionFiles(TABLE_NAME).size());
Assert.assertEquals(0, manifestFiles(TABLE_NAME).size());
Assert.assertEquals(altered.asStruct(), icebergTable.schema().asStruct());
- final Table table = metastoreClient.getTable(DB_NAME, TABLE_NAME);
+ final org.apache.hadoop.hive.metastore.api.Table table =
metastoreClient.getTable(DB_NAME, TABLE_NAME);
final List<String> hiveColumns = table.getSd().getCols().stream().map(f ->
f.getName()).collect(Collectors.toList());
final List<String> icebergColumns = altered.columns().stream().map(f ->
f.name()).collect(Collectors.toList());
Assert.assertEquals(icebergColumns, hiveColumns);
@@ -87,9 +87,9 @@ public class HiveTablesTest extends HiveTableBaseTest {
@Test(expected = CommitFailedException.class)
public void testFailure() throws TException {
- org.apache.iceberg.Table icebergTable = new
HiveTables(hiveConf).load(DB_NAME, TABLE_NAME);
- final Table table = metastoreClient.getTable(DB_NAME, TABLE_NAME);
- final String dummyLocation = "dummylocation";
+ Table icebergTable = tables.load(DB_NAME, TABLE_NAME);
+ org.apache.hadoop.hive.metastore.api.Table table =
metastoreClient.getTable(DB_NAME, TABLE_NAME);
+ String dummyLocation = "dummylocation";
table.getParameters().put(METADATA_LOCATION_PROP, dummyLocation);
metastoreClient.alter_table(DB_NAME, TABLE_NAME, table);
icebergTable.updateSchema()
@@ -99,9 +99,8 @@ public class HiveTablesTest extends HiveTableBaseTest {
@Test
public void testConcurrentFastAppends() {
- HiveTables hiveTables = new HiveTables(hiveConf);
- org.apache.iceberg.Table icebergTable = hiveTables.load(DB_NAME,
TABLE_NAME);
- org.apache.iceberg.Table anotherIcebergTable = hiveTables.load(DB_NAME,
TABLE_NAME);
+ Table icebergTable = tables.load(DB_NAME, TABLE_NAME);
+ Table anotherIcebergTable = tables.load(DB_NAME, TABLE_NAME);
String fileName = UUID.randomUUID().toString();
DataFile file = DataFiles.builder(icebergTable.spec())
@@ -113,20 +112,22 @@ public class HiveTablesTest extends HiveTableBaseTest {
ExecutorService executorService = MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));
+ AtomicInteger barrier = new AtomicInteger(0);
Tasks.foreach(icebergTable, anotherIcebergTable)
.stopOnFailure().throwFailureWhenFinished()
.executeWith(executorService)
.run(table -> {
for (int numCommittedFiles = 0; numCommittedFiles < 10;
numCommittedFiles++) {
- long commitStartTime = System.currentTimeMillis();
- table.newFastAppend().appendFile(file).commit();
- long commitEndTime = System.currentTimeMillis();
- long commitDuration = commitEndTime - commitStartTime;
- try {
- TimeUnit.MILLISECONDS.sleep(200 - commitDuration);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+ while (barrier.get() < numCommittedFiles * 2) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
+
+ table.newFastAppend().appendFile(file).commit();
+ barrier.incrementAndGet();
}
});
diff --git a/hive/src/test/resources/hive-schema-3.1.0.derby.sql
b/hive/src/test/resources/hive-schema-3.1.0.derby.sql
index 043cf07..6411e98 100644
--- a/hive/src/test/resources/hive-schema-3.1.0.derby.sql
+++ b/hive/src/test/resources/hive-schema-3.1.0.derby.sql
@@ -22,7 +22,7 @@ CREATE TABLE "APP"."DBS" (
"NAME" VARCHAR(128),
"OWNER_NAME" VARCHAR(128),
"OWNER_TYPE" VARCHAR(10),
- "CTLG_NAME" VARCHAR(256) NOT NULL
+ "CTLG_NAME" VARCHAR(256)
);
CREATE TABLE "APP"."TBL_PRIVS" ("TBL_GRANT_ID" BIGINT NOT NULL, "CREATE_TIME"
INTEGER NOT NULL, "GRANT_OPTION" SMALLINT NOT NULL, "GRANTOR" VARCHAR(128),
"GRANTOR_TYPE" VARCHAR(128), "PRINCIPAL_NAME" VARCHAR(128), "PRINCIPAL_TYPE"
VARCHAR(128), "TBL_PRIV" VARCHAR(128), "TBL_ID" BIGINT, "AUTHORIZER"
VARCHAR(128));