This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e89a3b  Use Hive 1.2.1 and add a client pool (#166)
1e89a3b is described below

commit 1e89a3bb11644241bdc7b75b72570c01e96c40c7
Author: Ryan Blue <[email protected]>
AuthorDate: Thu May 2 11:35:46 2019 -0700

    Use Hive 1.2.1 and add a client pool (#166)
---
 build.gradle                                       |  10 +-
 .../java/org/apache/iceberg/hive/ClientPool.java   | 139 +++++++++++++++++++++
 .../org/apache/iceberg/hive/HiveClientPool.java    |  63 ++++++++++
 .../apache/iceberg/hive/HiveTableOperations.java   |  54 ++++----
 .../java/org/apache/iceberg/hive/HiveTables.java   |  37 ++----
 .../apache/iceberg/hive/RuntimeMetaException.java  |  35 ++++++
 .../org/apache/iceberg/hive/HiveTableBaseTest.java | 135 +++++++++-----------
 .../org/apache/iceberg/hive/HiveTablesTest.java    |  43 +++----
 .../src/test/resources/hive-schema-3.1.0.derby.sql |   2 +-
 9 files changed, 371 insertions(+), 147 deletions(-)

diff --git a/build.gradle b/build.gradle
index 6ac2381..ae8920e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -78,7 +78,7 @@ subprojects {
     avroVersion = '1.8.2'
     orcVersion = '1.4.2'
     parquetVersion = '1.10.0'
-    hiveVersion = '3.1.0'
+    hiveVersion = '1.2.1'
 
     jacksonVersion = '2.6.7'
 
@@ -97,6 +97,7 @@ subprojects {
     testCompile 'org.slf4j:slf4j-simple:1.7.5'
     testCompile 'org.mockito:mockito-all:1.10.19'
   }
+
   publishing {
     publications {
       nebula(MavenPublication) {
@@ -207,12 +208,15 @@ project(':iceberg-hive') {
 
     compileOnly "org.apache.avro:avro:$avroVersion"
 
-    compileOnly("org.apache.hive:hive-standalone-metastore:$hiveVersion") {
+    compileOnly("org.apache.hive:hive-metastore:$hiveVersion") {
       exclude group: 'org.apache.avro', module: 'avro'
+      exclude group: 'org.slf4j', module: 'slf4j-log4j12'
     }
 
     testCompile("org.apache.hive:hive-exec:$hiveVersion") {
       exclude group: 'org.apache.avro', module: 'avro'
+      exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+      exclude group: 'org.pentaho' // missing dependency
     }
 
     compileOnly("org.apache.hadoop:hadoop-client:$hadoopVersion") {
@@ -368,7 +372,7 @@ project(':iceberg-presto-runtime') {
 
         shadow "org.apache.parquet:parquet-avro:$parquetVersion"
         shadow "org.apache.avro:avro:$avroVersion"
-        shadow ("org.apache.hive:hive-standalone-metastore:$hiveVersion") {
+        shadow ("org.apache.hive:hive-metastore:$hiveVersion") {
             exclude group: 'org.apache.hadoop', module: 'hadoop-common'
 //            exclude group: 'org.apache.orc', module: 'orc-core'
         }
diff --git a/hive/src/main/java/org/apache/iceberg/hive/ClientPool.java 
b/hive/src/main/java/org/apache/iceberg/hive/ClientPool.java
new file mode 100644
index 0000000..1baffb2
--- /dev/null
+++ b/hive/src/main/java/org/apache/iceberg/hive/ClientPool.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+abstract class ClientPool<C, E extends Exception> implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(ClientPool.class);
+
+  private final int poolSize;
+  private final Deque<C> clients;
+  private final Class<E> reconnectExc;
+  private final Object signal = new Object();
+  private volatile int currentSize;
+  private boolean closed;
+  private int runs = 0;
+
+  ClientPool(int poolSize, Class<E> reconnectExc) {
+    this.poolSize = poolSize;
+    this.reconnectExc = reconnectExc;
+    this.clients = new ArrayDeque<>(poolSize);
+    this.currentSize = 0;
+    this.closed = false;
+  }
+
+  interface Action<R, C, E extends Exception> {
+    R run(C client) throws E;
+  }
+
+  public <R> R run(Action<R, C, E> action) throws E, InterruptedException {
+    runs += 1;
+    C client = get();
+    try {
+      return action.run(client);
+
+    } catch (Exception exc) {
+      if (reconnectExc.isInstance(exc)) {
+        try {
+          client = reconnect(client);
+        } catch (Exception ignored) {
+          // if reconnection throws any exception, rethrow the original failure
+          throw reconnectExc.cast(exc);
+        }
+
+        return action.run(client);
+      }
+
+      throw exc;
+
+    } finally {
+      release(client);
+    }
+  }
+
+  protected abstract C newClient();
+
+  protected abstract C reconnect(C client);
+
+  protected abstract void close(C client);
+
+  private C get() throws InterruptedException {
+    Preconditions.checkState(!closed, "Cannot get a client from a closed 
pool");
+    while (true) {
+      if (!clients.isEmpty() || currentSize < poolSize) {
+        synchronized (this) {
+          if (!clients.isEmpty()) {
+            return clients.removeFirst();
+          } else if (currentSize < poolSize) {
+            currentSize += 1;
+            return newClient();
+          }
+        }
+      }
+      synchronized (signal) {
+        // wake every second in case this missed the signal
+        signal.wait(1000);
+      }
+    }
+  }
+
+  private void release(C client) {
+    synchronized (this) {
+      clients.addFirst(client);
+    }
+    synchronized (signal) {
+      signal.notify();
+    }
+  }
+
+  @Override
+  public void close() {
+    this.closed = true;
+    try {
+      while (currentSize > 0) {
+        if (!clients.isEmpty()) {
+          synchronized (this) {
+            if (!clients.isEmpty()) {
+              C client = clients.removeFirst();
+              close(client);
+              currentSize -= 1;
+            }
+          }
+        }
+        if (clients.isEmpty() && currentSize > 0) {
+          // wake every second in case this missed the signal
+          synchronized (signal) {
+            signal.wait(1000);
+          }
+        }
+      }
+
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.warn("Interrupted while shutting down pool. Some clients may not be 
closed.");
+    }
+  }
+}
diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveClientPool.java 
b/hive/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
new file mode 100644
index 0000000..8527555
--- /dev/null
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.thrift.TException;
+
+class HiveClientPool extends ClientPool<HiveMetaStoreClient, TException> {
+  private final HiveConf hiveConf;
+
+  HiveClientPool(Configuration conf) {
+    this(conf.getInt("iceberg.hive.client-pool-size", 5), conf);
+  }
+
+  HiveClientPool(int poolSize, Configuration conf) {
+    super(poolSize, TException.class);
+    this.hiveConf = new HiveConf(conf, HiveClientPool.class);
+  }
+
+  @Override
+  protected HiveMetaStoreClient newClient()  {
+    try {
+      return new HiveMetaStoreClient(hiveConf);
+    } catch (MetaException e) {
+      throw new RuntimeMetaException(e, "Failed to connect to Hive Metastore");
+    }
+  }
+
+  @Override
+  protected HiveMetaStoreClient reconnect(HiveMetaStoreClient client) {
+    try {
+      client.reconnect();
+    } catch (MetaException e) {
+      throw new RuntimeMetaException(e, "Failed to reconnect to Hive 
Metastore");
+    }
+    return client;
+  }
+
+  @Override
+  protected void close(HiveMetaStoreClient client) {
+    client.close();
+  }
+}
diff --git 
a/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java 
b/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index c157756..fa7f1df 100644
--- a/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -27,7 +27,6 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.LockComponent;
 import org.apache.hadoop.hive.metastore.api.LockLevel;
@@ -37,11 +36,8 @@ import org.apache.hadoop.hive.metastore.api.LockState;
 import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.SerdeType;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
-import org.apache.hadoop.hive.metastore.api.UnlockRequest;
 import org.apache.iceberg.BaseMetastoreTableOperations;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.TableMetadata;
@@ -56,20 +52,17 @@ import static java.lang.String.format;
 /**
  * TODO we should be able to extract some more commonalities to 
BaseMetastoreTableOperations to
  * avoid code duplication between this class and Metacat Tables.
- *
- * Note! This class is not thread-safe as {@link ThriftHiveMetastore.Client} 
does not behave
- * correctly in a multi-threaded environment.
  */
 public class HiveTableOperations extends BaseMetastoreTableOperations {
   private static final Logger LOG = 
LoggerFactory.getLogger(HiveTableOperations.class);
 
-  private final ThriftHiveMetastore.Client metaStoreClient;
+  private final HiveClientPool metaClients;
   private final String database;
   private final String tableName;
 
-  protected HiveTableOperations(Configuration conf, ThriftHiveMetastore.Client 
metaStoreClient, String database, String table) {
+  protected HiveTableOperations(Configuration conf, HiveClientPool 
metaClients, String database, String table) {
     super(conf);
-    this.metaStoreClient = metaStoreClient;
+    this.metaClients = metaClients;
     this.database = database;
     this.tableName = table;
   }
@@ -78,7 +71,7 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
   public TableMetadata refresh() {
     String metadataLocation = null;
     try {
-      final Table table = metaStoreClient.get_table(database, tableName);
+      final Table table = metaClients.run(client -> client.getTable(database, 
tableName));
       String tableType = table.getParameters().get(TABLE_TYPE_PROP);
 
       if (tableType == null || 
!tableType.equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE)) {
@@ -96,7 +89,11 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
       }
 
     } catch (TException e) {
-      throw new RuntimeException(format("Failed to get table info from 
metastore %s.%s", database, tableName));
+      throw new RuntimeException(format("Failed to get table info from 
metastore %s.%s", database, tableName), e);
+
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted during refresh", e);
     }
 
     refreshFromMetadataLocation(metadataLocation);
@@ -108,7 +105,7 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
   public void commit(TableMetadata base, TableMetadata metadata) {
     // if the metadata is already out of date, reject it
     if (base != current()) {
-      throw new CommitFailedException(format("stale table metadata for %s.%s", 
database, tableName));
+      throw new CommitFailedException("Cannot commit: stale table metadata for 
%s.%s", database, tableName);
     }
 
     // if the metadata is not changed, return early
@@ -126,7 +123,7 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
       // TODO add lock heart beating for cases where default lock timeout is 
too low.
       Table tbl;
       if (base != null) {
-        tbl = metaStoreClient.get_table(database, tableName);
+        tbl = metaClients.run(client -> client.getTable(database, tableName));
       } else {
         final long currentTimeMillis = System.currentTimeMillis();
         tbl = new Table(tableName,
@@ -153,13 +150,24 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
       setParameters(newMetadataLocation, tbl);
 
       if (base != null) {
-        metaStoreClient.alter_table(database, tableName, tbl);
+        metaClients.run(client -> {
+          client.alter_table(database, tableName, tbl);
+          return null;
+        });
       } else {
-        metaStoreClient.create_table(tbl);
+        metaClients.run(client -> {
+          client.createTable(tbl);
+          return null;
+        });
       }
       threw = false;
     } catch (TException | UnknownHostException e) {
       throw new RuntimeException(format("Metastore operation failed for 
%s.%s", database, tableName), e);
+
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted during commit", e);
+
     } finally {
       if (threw) {
         // if anything went wrong, clean up the uncommitted metadata file
@@ -196,7 +204,6 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
     
storageDescriptor.setOutputFormat("org.apache.hadoop.mapred.FileInputFormat");
     
storageDescriptor.setInputFormat("org.apache.hadoop.mapred.FileOutputFormat");
     SerDeInfo serDeInfo = new SerDeInfo();
-    serDeInfo.setSerdeType(SerdeType.HIVE);
     
serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe");
     storageDescriptor.setSerdeInfo(serDeInfo);
     return storageDescriptor;
@@ -206,18 +213,18 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
     return schema.columns().stream().map(col -> new FieldSchema(col.name(), 
HiveTypeConverter.convert(col.type()), "")).collect(Collectors.toList());
   }
 
-  private long acquireLock() throws UnknownHostException, TException {
+  private long acquireLock() throws UnknownHostException, TException, 
InterruptedException {
     final LockComponent lockComponent = new LockComponent(LockType.EXCLUSIVE, 
LockLevel.TABLE, database);
     lockComponent.setTablename(tableName);
     final LockRequest lockRequest = new 
LockRequest(Lists.newArrayList(lockComponent),
             System.getProperty("user.name"),
             InetAddress.getLocalHost().getHostName());
-    LockResponse lockResponse = metaStoreClient.lock(lockRequest);
+    LockResponse lockResponse = metaClients.run(client -> 
client.lock(lockRequest));
     LockState state = lockResponse.getState();
     long lockId = lockResponse.getLockid();
     //TODO add timeout
     while (state.equals(LockState.WAITING)) {
-      lockResponse = metaStoreClient.check_lock(new 
CheckLockRequest(lockResponse.getLockid()));
+      lockResponse = metaClients.run(client -> client.checkLock(lockId));
       state = lockResponse.getState();
     }
 
@@ -231,8 +238,11 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations {
   private void unlock(Optional<Long> lockId) {
     if (lockId.isPresent()) {
       try {
-        metaStoreClient.unlock(new UnlockRequest(lockId.get()));
-      } catch (TException e) {
+        metaClients.run(client -> {
+          client.unlock(lockId.get());
+          return null;
+        });
+      } catch (Exception e) {
         throw new RuntimeException(format("Failed to unlock %s.%s", database, 
tableName) , e);
       }
     }
diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveTables.java 
b/hive/src/main/java/org/apache/iceberg/hive/HiveTables.java
index ea3d32f..7332264 100644
--- a/hive/src/main/java/org/apache/iceberg/hive/HiveTables.java
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveTables.java
@@ -16,33 +16,23 @@
 package org.apache.iceberg.hive;
 
 import com.google.common.base.Splitter;
-import java.net.URI;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.iceberg.BaseMetastoreTableOperations;
 import org.apache.iceberg.BaseMetastoreTables;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-
-import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.CLIENT_SOCKET_TIMEOUT;
-import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.THRIFT_URIS;
+import java.io.Closeable;
+import java.util.List;
+import java.util.Map;
 
-public class HiveTables extends BaseMetastoreTables {
+public class HiveTables extends BaseMetastoreTables implements Closeable {
   private static final Splitter DOT = Splitter.on('.').limit(2);
-  private Configuration conf;
+  private final HiveClientPool clients;
 
   public HiveTables(Configuration conf) {
     super(conf);
-    this.conf = conf;
+    this.clients = new HiveClientPool(2, conf);
   }
 
   @Override
@@ -70,18 +60,11 @@ public class HiveTables extends BaseMetastoreTables {
 
   @Override
   public BaseMetastoreTableOperations newTableOps(Configuration conf, String 
database, String table) {
-    return new HiveTableOperations(conf, getClient(), database, table);
+    return new HiveTableOperations(conf, clients, database, table);
   }
 
-  private ThriftHiveMetastore.Client getClient() {
-    final URI metastoreUri = URI.create(MetastoreConf.getAsString(conf, 
THRIFT_URIS));
-    final int socketTimeOut = (int) MetastoreConf.getTimeVar(conf, 
CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);
-    TTransport transport = new TSocket(metastoreUri.getHost(), 
metastoreUri.getPort(), socketTimeOut);
-    try {
-      transport.open();
-    } catch (TTransportException e) {
-      throw new RuntimeException("failed to open socket for " + metastoreUri + 
" with timeoutMillis " + socketTimeOut);
-    }
-    return new ThriftHiveMetastore.Client(new TBinaryProtocol(transport));
+  @Override
+  public void close() {
+    clients.close();
   }
 }
diff --git 
a/hive/src/main/java/org/apache/iceberg/hive/RuntimeMetaException.java 
b/hive/src/main/java/org/apache/iceberg/hive/RuntimeMetaException.java
new file mode 100644
index 0000000..94e3e7b
--- /dev/null
+++ b/hive/src/main/java/org/apache/iceberg/hive/RuntimeMetaException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import org.apache.hadoop.hive.metastore.api.MetaException;
+
+/**
+ * Exception used to wrap {@link MetaException} as a {@link RuntimeException} 
and add context.
+ */
+public class RuntimeMetaException extends RuntimeException {
+  public RuntimeMetaException(MetaException cause) {
+    super(cause);
+  }
+
+  public RuntimeMetaException(MetaException cause, String message, Object... 
args) {
+    super(String.format(message, args), cause);
+  }
+}
diff --git a/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java 
b/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
index 4b6fc15..0f43a4f 100644
--- a/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
+++ b/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
@@ -20,7 +20,6 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.Reader;
-import java.lang.reflect.InvocationTargetException;
 import java.net.URL;
 import java.nio.file.Paths;
 import java.sql.Connection;
@@ -41,36 +40,30 @@ import org.apache.hadoop.hive.metastore.IHMSHandler;
 import org.apache.hadoop.hive.metastore.RetryingHMSHandler;
 import org.apache.hadoop.hive.metastore.TServerSocketKeepAlive;
 import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
-import org.apache.hadoop.hive.metastore.api.Catalog;
 import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.types.Types;
-import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 
 import static java.nio.file.Files.createTempDirectory;
 import static java.nio.file.attribute.PosixFilePermissions.asFileAttribute;
 import static java.nio.file.attribute.PosixFilePermissions.fromString;
-import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.CONNECT_URL_KEY;
-import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.THRIFT_URIS;
-import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.WAREHOUSE;
 import static org.apache.iceberg.PartitionSpec.builderFor;
 import static org.apache.iceberg.TableMetadataParser.getFileExtension;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
 
-class HiveTableBaseTest {
+public class HiveTableBaseTest {
 
   static final String DB_NAME = "hivedb";
   static final String TABLE_NAME =  "tbl";
@@ -84,37 +77,34 @@ class HiveTableBaseTest {
 
   private static final PartitionSpec partitionSpec = 
builderFor(schema).identity("id").build();
 
-  Configuration hiveConf;
-  HiveMetaStoreClient metastoreClient;
-  private File hiveLocalDir;
+  private static HiveConf hiveConf;
+  private static File hiveLocalDir;
 
-  private ExecutorService executorService;
-  private TServer server;
+  private static ExecutorService executorService;
+  private static TServer server;
 
-  @Before
-  public void setup() throws IOException,
-          TException,
-          InvocationTargetException,
-          NoSuchMethodException,
-          IllegalAccessException,
-          NoSuchFieldException, SQLException {
-    this.executorService = Executors.newSingleThreadExecutor();
-    hiveLocalDir = createTempDirectory("hive", 
asFileAttribute(fromString("rwxrwxrwx"))).toFile();
+  static HiveMetaStoreClient metastoreClient;
+
+  @BeforeClass
+  public static void startMetastore() throws Exception {
+    HiveTableBaseTest.executorService = Executors.newSingleThreadExecutor();
+    HiveTableBaseTest.hiveLocalDir = createTempDirectory("hive", 
asFileAttribute(fromString("rwxrwxrwx"))).toFile();
     File derbyLogFile = new File(hiveLocalDir, "derby.log");
     System.setProperty("derby.stream.error.file", 
derbyLogFile.getAbsolutePath());
     setupDB("jdbc:derby:" + getDerbyPath() + ";create=true");
 
-    this.server = thriftServer();
+    HiveTableBaseTest.server = thriftServer();
     executorService.submit(() -> server.serve());
 
-    this.metastoreClient = new HiveMetaStoreClient(this.hiveConf);
-    createIfNotExistsCatalog("hive");
-    this.metastoreClient.createDatabase(new Database(DB_NAME, "description", 
getDBPath(), new HashMap<>()));
-    new HiveTables(this.hiveConf).create(schema, partitionSpec, DB_NAME, 
TABLE_NAME);
+    HiveTableBaseTest.metastoreClient = new HiveMetaStoreClient(hiveConf);
+    metastoreClient.createDatabase(new Database(DB_NAME, "description", 
getDBPath(), new HashMap<>()));
   }
 
-  @After
-  public void cleanup() {
+  @AfterClass
+  public static void stopMetastore() {
+    metastoreClient.close();
+    HiveTableBaseTest.metastoreClient = null;
+
     if (server != null) {
       server.stop();
     }
@@ -126,37 +116,44 @@ class HiveTableBaseTest {
     }
   }
 
-  private HiveConf hiveConf(Configuration conf, int port) {
-    final HiveConf hiveConf = new HiveConf(conf, this.getClass());
-    hiveConf.set(THRIFT_URIS.getVarname(), "thrift://localhost:" + port);
-    hiveConf.set(WAREHOUSE.getVarname(), "file:" + 
hiveLocalDir.getAbsolutePath());
-    hiveConf.set(WAREHOUSE.getHiveName(), "file:" + 
hiveLocalDir.getAbsolutePath());
-    hiveConf.set(CONNECT_URL_KEY.getVarname(), "jdbc:derby:" + getDerbyPath() 
+ ";create=true");
+  HiveTables tables;
+
+  @Before
+  public void createTestTable() throws Exception {
+    this.tables = new HiveTables(hiveConf);
+    tables.create(schema, partitionSpec, DB_NAME, TABLE_NAME);
+  }
+
+  @After
+  public void dropTestTable() throws Exception {
+    metastoreClient.dropTable(DB_NAME, TABLE_NAME);
+    tables.close();
+    this.tables = null;
+  }
+
+  private static HiveConf hiveConf(Configuration conf, int port) {
+    final HiveConf hiveConf = new HiveConf(conf, HiveTableBaseTest.class);
+    hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, 
"thrift://localhost:" + port);
+    hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "file:" + 
hiveLocalDir.getAbsolutePath());
     return hiveConf;
   }
 
-  private String getDerbyPath() {
+  private static String getDerbyPath() {
     final File metastore_db = new File(hiveLocalDir, "metastore_db");
     return metastore_db.getPath();
   }
 
-  private TServer thriftServer() throws IOException,
-          TTransportException,
-          MetaException,
-          InvocationTargetException,
-          NoSuchMethodException,
-          IllegalAccessException,
-          NoSuchFieldException {
-    final TServerSocketKeepAlive socket = new TServerSocketKeepAlive(new 
TServerSocket(0));
-    this.hiveConf = hiveConf(new Configuration(), 
socket.getServerSocket().getLocalPort());
-    HiveMetaStore.HMSHandler baseHandler = new HiveMetaStore.HMSHandler("new 
db based metaserver", hiveConf);
-    IHMSHandler handler = RetryingHMSHandler.getProxy(hiveConf, baseHandler, 
true);
-    final TTransportFactory transportFactory = new TTransportFactory();
-    final TSetIpAddressProcessor<IHMSHandler> processor = new 
TSetIpAddressProcessor<>(handler);
+  private static TServer thriftServer() throws Exception {
+    TServerSocket socket = new TServerSocket(0);
+    HiveTableBaseTest.hiveConf = hiveConf(new Configuration(), 
socket.getServerSocket().getLocalPort());
+    HiveConf serverConf = new HiveConf(hiveConf);
+    serverConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, 
"jdbc:derby:" + getDerbyPath() + ";create=true");
+    HiveMetaStore.HMSHandler baseHandler = new HiveMetaStore.HMSHandler("new 
db based metaserver", serverConf);
+    IHMSHandler handler = RetryingHMSHandler.getProxy(serverConf, baseHandler, 
false);
 
     TThreadPoolServer.Args args = new TThreadPoolServer.Args(socket)
-            .processor(processor)
-            .transportFactory(transportFactory)
+            .processor(new TSetIpAddressProcessor<>(handler))
+            .transportFactory(new TTransportFactory())
             .protocolFactory(new TBinaryProtocol.Factory())
             .minWorkerThreads(3)
             .maxWorkerThreads(5);
@@ -164,55 +161,47 @@ class HiveTableBaseTest {
     return new TThreadPoolServer(args);
   }
 
-  private void setupDB(String dbURL) throws SQLException, IOException {
+  private static void setupDB(String dbURL) throws SQLException, IOException {
     Connection connection = DriverManager.getConnection(dbURL);
     ScriptRunner scriptRunner = new ScriptRunner(connection, true, true);
 
-    URL hiveSqlScript = 
getClass().getClassLoader().getResource("hive-schema-3.1.0.derby.sql");
-    Reader reader = new BufferedReader(new FileReader(new 
File(hiveSqlScript.getFile())));
-    scriptRunner.runScript(reader);
-  }
-
-  private void createIfNotExistsCatalog(String catalogName) throws TException {
-    try {
-      metastoreClient.getCatalog(catalogName);
-    } catch(NoSuchObjectException e) {
-      String catalogPath = Paths.get(hiveLocalDir.getAbsolutePath(), 
catalogName + ".catalog").toString();
-      metastoreClient.createCatalog(new Catalog(catalogName, catalogPath));
+    URL hiveSqlScript = 
HiveTableBaseTest.class.getClassLoader().getResource("hive-schema-3.1.0.derby.sql");
+    try (Reader reader = new BufferedReader(new FileReader(new 
File(hiveSqlScript.getFile())))) {
+      scriptRunner.runScript(reader);
     }
   }
 
-  private String getDBPath() {
+  private static String getDBPath() {
    return Paths.get(hiveLocalDir.getAbsolutePath(), DB_NAME + 
".db").toAbsolutePath().toString();
   }
 
-  String getTableBasePath(String tableName) {
+  private static String getTableBasePath(String tableName) {
     return Paths.get(getDBPath(), tableName).toAbsolutePath().toString();
   }
 
-  String getTableLocation(String tableName) {
+  protected static String getTableLocation(String tableName) {
     return new Path("file", null, 
Paths.get(getTableBasePath(tableName)).toString()).toString();
   }
 
-  String metadataLocation(String tableName) {
+  private static String metadataLocation(String tableName) {
     return Paths.get(getTableBasePath(tableName), "metadata").toString();
   }
 
-  private List<String> metadataFiles(String tableName) {
+  private static List<String> metadataFiles(String tableName) {
     return Arrays.stream(new File(metadataLocation(tableName)).listFiles())
             .map(File::getAbsolutePath)
             .collect(Collectors.toList());
   }
 
-  List<String> metadataVersionFiles(String tableName) {
+  protected static List<String> metadataVersionFiles(String tableName) {
     return filterByExtension(tableName, getFileExtension(hiveConf));
   }
 
-  List<String> manifestFiles(String tableName) {
+  protected static List<String> manifestFiles(String tableName) {
     return filterByExtension(tableName, ".avro");
   }
 
-  private List<String> filterByExtension(String tableName, String extension) {
+  private static List<String> filterByExtension(String tableName, String 
extension) {
     return metadataFiles(tableName)
             .stream()
             .filter(f -> f.endsWith(extension))
diff --git a/hive/src/test/java/org/apache/iceberg/hive/HiveTablesTest.java 
b/hive/src/test/java/org/apache/iceberg/hive/HiveTablesTest.java
index 222565c..607e060 100644
--- a/hive/src/test/java/org/apache/iceberg/hive/HiveTablesTest.java
+++ b/hive/src/test/java/org/apache/iceberg/hive/HiveTablesTest.java
@@ -22,12 +22,12 @@ import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
-import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.Tasks;
@@ -43,7 +43,7 @@ public class HiveTablesTest extends HiveTableBaseTest {
   @Test
   public void testCreate() throws TException {
     // Table should be created in hive metastore
-    final Table table = metastoreClient.getTable(DB_NAME, TABLE_NAME);
+    final org.apache.hadoop.hive.metastore.api.Table table = 
metastoreClient.getTable(DB_NAME, TABLE_NAME);
 
     // check parameters are in expected state
     final Map<String, String> parameters = table.getParameters();
@@ -61,25 +61,25 @@ public class HiveTablesTest extends HiveTableBaseTest {
     Assert.assertEquals(1, metadataVersionFiles(TABLE_NAME).size());
     Assert.assertEquals(0, manifestFiles(TABLE_NAME).size());
 
-    final org.apache.iceberg.Table icebergTable = new 
HiveTables(hiveConf).load(DB_NAME, TABLE_NAME);
+    final Table icebergTable = tables.load(DB_NAME, TABLE_NAME);
     // Iceberg schema should match the loaded table
     Assert.assertEquals(schema.asStruct(), icebergTable.schema().asStruct());
   }
 
   @Test
   public void testExistingTableUpdate() throws TException {
-    org.apache.iceberg.Table icebergTable = new 
HiveTables(hiveConf).load(DB_NAME, TABLE_NAME);
+    Table icebergTable = tables.load(DB_NAME, TABLE_NAME);
     // add a column
     icebergTable.updateSchema().addColumn("data", 
Types.LongType.get()).commit();
 
-    icebergTable = new HiveTables(hiveConf).load(DB_NAME, TABLE_NAME);
+    icebergTable = tables.load(DB_NAME, TABLE_NAME);
 
     // Only 2 snapshotFile Should exist and no manifests should exist
     Assert.assertEquals(2, metadataVersionFiles(TABLE_NAME).size());
     Assert.assertEquals(0, manifestFiles(TABLE_NAME).size());
     Assert.assertEquals(altered.asStruct(), icebergTable.schema().asStruct());
 
-    final Table table = metastoreClient.getTable(DB_NAME, TABLE_NAME);
+    final org.apache.hadoop.hive.metastore.api.Table table = 
metastoreClient.getTable(DB_NAME, TABLE_NAME);
     final List<String> hiveColumns = table.getSd().getCols().stream().map(f -> 
f.getName()).collect(Collectors.toList());
     final List<String> icebergColumns = altered.columns().stream().map(f -> 
f.name()).collect(Collectors.toList());
     Assert.assertEquals(icebergColumns, hiveColumns);
@@ -87,9 +87,9 @@ public class HiveTablesTest extends HiveTableBaseTest {
 
   @Test(expected = CommitFailedException.class)
   public void testFailure() throws TException {
-    org.apache.iceberg.Table icebergTable = new 
HiveTables(hiveConf).load(DB_NAME, TABLE_NAME);
-    final Table table = metastoreClient.getTable(DB_NAME, TABLE_NAME);
-    final String dummyLocation = "dummylocation";
+    Table icebergTable = tables.load(DB_NAME, TABLE_NAME);
+    org.apache.hadoop.hive.metastore.api.Table table = 
metastoreClient.getTable(DB_NAME, TABLE_NAME);
+    String dummyLocation = "dummylocation";
     table.getParameters().put(METADATA_LOCATION_PROP, dummyLocation);
     metastoreClient.alter_table(DB_NAME, TABLE_NAME, table);
     icebergTable.updateSchema()
@@ -99,9 +99,8 @@ public class HiveTablesTest extends HiveTableBaseTest {
 
   @Test
   public void testConcurrentFastAppends() {
-    HiveTables hiveTables = new HiveTables(hiveConf);
-    org.apache.iceberg.Table icebergTable = hiveTables.load(DB_NAME, 
TABLE_NAME);
-    org.apache.iceberg.Table anotherIcebergTable = hiveTables.load(DB_NAME, 
TABLE_NAME);
+    Table icebergTable = tables.load(DB_NAME, TABLE_NAME);
+    Table anotherIcebergTable = tables.load(DB_NAME, TABLE_NAME);
 
     String fileName = UUID.randomUUID().toString();
     DataFile file = DataFiles.builder(icebergTable.spec())
@@ -113,20 +112,22 @@ public class HiveTablesTest extends HiveTableBaseTest {
     ExecutorService executorService = MoreExecutors.getExitingExecutorService(
       (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
 
+    AtomicInteger barrier = new AtomicInteger(0);
     Tasks.foreach(icebergTable, anotherIcebergTable)
       .stopOnFailure().throwFailureWhenFinished()
       .executeWith(executorService)
       .run(table -> {
         for (int numCommittedFiles = 0; numCommittedFiles < 10; 
numCommittedFiles++) {
-          long commitStartTime = System.currentTimeMillis();
-          table.newFastAppend().appendFile(file).commit();
-          long commitEndTime = System.currentTimeMillis();
-          long commitDuration = commitEndTime - commitStartTime;
-          try {
-            TimeUnit.MILLISECONDS.sleep(200 - commitDuration);
-          } catch (InterruptedException e) {
-            throw new RuntimeException(e);
+          while (barrier.get() < numCommittedFiles * 2) {
+            try {
+              Thread.sleep(10);
+            } catch (InterruptedException e) {
+              throw new RuntimeException(e);
+            }
           }
+
+          table.newFastAppend().appendFile(file).commit();
+          barrier.incrementAndGet();
         }
       });
 
diff --git a/hive/src/test/resources/hive-schema-3.1.0.derby.sql 
b/hive/src/test/resources/hive-schema-3.1.0.derby.sql
index 043cf07..6411e98 100644
--- a/hive/src/test/resources/hive-schema-3.1.0.derby.sql
+++ b/hive/src/test/resources/hive-schema-3.1.0.derby.sql
@@ -22,7 +22,7 @@ CREATE TABLE "APP"."DBS" (
   "NAME" VARCHAR(128),
   "OWNER_NAME" VARCHAR(128),
   "OWNER_TYPE" VARCHAR(10),
-  "CTLG_NAME" VARCHAR(256) NOT NULL
+  "CTLG_NAME" VARCHAR(256)
 );
 
 CREATE TABLE "APP"."TBL_PRIVS" ("TBL_GRANT_ID" BIGINT NOT NULL, "CREATE_TIME" 
INTEGER NOT NULL, "GRANT_OPTION" SMALLINT NOT NULL, "GRANTOR" VARCHAR(128), 
"GRANTOR_TYPE" VARCHAR(128), "PRINCIPAL_NAME" VARCHAR(128), "PRINCIPAL_TYPE" 
VARCHAR(128), "TBL_PRIV" VARCHAR(128), "TBL_ID" BIGINT, "AUTHORIZER" 
VARCHAR(128));

Reply via email to