PHOENIX-2743 HivePhoenixHandler for big-big join with Signed-off-by: Josh Elser <els...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/521b8192 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/521b8192 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/521b8192 Branch: refs/heads/4.x-HBase-0.98 Commit: 521b8192806e6df3196fcf1b1eb933e33decbee7 Parents: 87f47b8 Author: Sergey Soldatov <sergey.solda...@gmail.com> Authored: Mon Apr 18 23:36:36 2016 -0700 Committer: Josh Elser <els...@apache.org> Committed: Tue Apr 19 21:18:56 2016 -0400 ---------------------------------------------------------------------- phoenix-hive/pom.xml | 158 +++ .../apache/phoenix/hive/HivePhoenixStoreIT.java | 303 ++++ .../org/apache/phoenix/hive/HiveTestUtil.java | 1291 ++++++++++++++++++ .../apache/phoenix/hive/PhoenixMetaHook.java | 246 ++++ .../phoenix/hive/PhoenixRecordUpdater.java | 336 +++++ .../org/apache/phoenix/hive/PhoenixRow.java | 64 + .../org/apache/phoenix/hive/PhoenixRowKey.java | 69 + .../org/apache/phoenix/hive/PhoenixSerDe.java | 159 +++ .../apache/phoenix/hive/PhoenixSerializer.java | 169 +++ .../phoenix/hive/PhoenixStorageHandler.java | 212 +++ .../PhoenixStorageHandlerConstants.java | 108 ++ .../hive/mapreduce/PhoenixInputFormat.java | 269 ++++ .../hive/mapreduce/PhoenixInputSplit.java | 160 +++ .../hive/mapreduce/PhoenixOutputFormat.java | 112 ++ .../hive/mapreduce/PhoenixRecordReader.java | 216 +++ .../hive/mapreduce/PhoenixRecordWriter.java | 355 +++++ .../hive/mapreduce/PhoenixResultWritable.java | 211 +++ .../AbstractPhoenixObjectInspector.java | 59 + .../PhoenixBinaryObjectInspector.java | 58 + .../PhoenixBooleanObjectInspector.java | 50 + .../PhoenixByteObjectInspector.java | 54 + .../PhoenixCharObjectInspector.java | 51 + .../PhoenixDateObjectInspector.java | 63 + .../PhoenixDecimalObjectInspector.java | 63 + .../PhoenixDoubleObjectInspector.java | 54 + .../PhoenixFloatObjectInspector.java | 55 + .../PhoenixIntObjectInspector.java | 51 + .../PhoenixListObjectInspector.java | 105 ++ .../PhoenixLongObjectInspector.java | 51 + .../PhoenixObjectInspectorFactory.java | 148 ++ .../PhoenixShortObjectInspector.java | 51 + .../PhoenixStringObjectInspector.java | 72 + .../PhoenixTimestampObjectInspector.java | 61 + .../hive/ppd/PhoenixPredicateDecomposer.java | 82 ++ .../ppd/PhoenixPredicateDecomposerManager.java | 83 ++ .../hive/ql/index/IndexPredicateAnalyzer.java | 523 +++++++ .../hive/ql/index/IndexSearchCondition.java | 143 ++ .../hive/ql/index/PredicateAnalyzerFactory.java | 40 + .../phoenix/hive/query/PhoenixQueryBuilder.java | 760 +++++++++++ .../hive/util/PhoenixConnectionUtil.java | 97 ++ .../hive/util/PhoenixStorageHandlerUtil.java | 278 ++++ .../apache/phoenix/hive/util/PhoenixUtil.java | 210 +++ phoenix-server/pom.xml | 6 + pom.xml | 39 +- 44 files changed, 7743 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/521b8192/phoenix-hive/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-hive/pom.xml b/phoenix-hive/pom.xml new file mode 100644 index 0000000..f815b56 --- /dev/null +++ b/phoenix-hive/pom.xml @@ -0,0 +1,158 @@ +<?xml version='1.0'?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix</artifactId> + <version>4.8.0-HBase-0.98-SNAPSHOT</version> + </parent> + <artifactId>phoenix-hive</artifactId> + <name>Phoenix - Hive</name> + + <dependencies> + <dependency> + <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-cli</artifactId> + <version>${hive.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>${hive.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </dependency> + + <!-- Test dependencies --> + <dependency> + <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix-core</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-testing-util</artifactId> + <scope>test</scope> + <optional>true</optional> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-it</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-common</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <version>${maven-dependency-plugin.version}</version> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-jar-with-dependencies</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/phoenix/blob/521b8192/phoenix-hive/src/it/java/org/apache/phoenix/hive/HivePhoenixStoreIT.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/it/java/org/apache/phoenix/hive/HivePhoenixStoreIT.java b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HivePhoenixStoreIT.java new file mode 100644 index 0000000..a707a06 --- /dev/null +++ b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HivePhoenixStoreIT.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hive; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.StringUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Properties; + +import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Test class to run all Hive Phoenix integration tests against a MINI Map-Reduce cluster. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class HivePhoenixStoreIT { + + private static final Log LOG = LogFactory.getLog(HivePhoenixStoreIT.class); + private static HBaseTestingUtility hbaseTestUtil; + private static String zkQuorum; + private static Connection conn; + private static Configuration conf; + private static HiveTestUtil qt; + private static String hiveOutputDir; + private static String hiveLogDir; + + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + String hadoopConfDir = System.getenv("HADOOP_CONF_DIR"); + if (null != hadoopConfDir && !hadoopConfDir.isEmpty()) { + LOG.warn("WARNING: HADOOP_CONF_DIR is set in the environment which may cause " + + "issues with test execution via MiniDFSCluster"); + } + hbaseTestUtil = new HBaseTestingUtility(); + conf = hbaseTestUtil.getConfiguration(); + setUpConfigForMiniCluster(conf); + conf.set(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); + hiveOutputDir = new Path(hbaseTestUtil.getDataTestDir(), "hive_output").toString(); + File outputDir = new File(hiveOutputDir); + outputDir.mkdirs(); + hiveLogDir = new Path(hbaseTestUtil.getDataTestDir(), "hive_log").toString(); + File logDir = new File(hiveLogDir); + logDir.mkdirs(); + // Setup Hive mini Server + Path testRoot = hbaseTestUtil.getDataTestDir(); + System.setProperty("test.tmp.dir", testRoot.toString()); + System.setProperty("test.warehouse.dir", (new Path(testRoot, "warehouse")).toString()); + + HiveTestUtil.MiniClusterType miniMR = HiveTestUtil.MiniClusterType.mr; + try { + qt = new HiveTestUtil(hiveOutputDir, hiveLogDir, miniMR, null); + } catch (Exception e) { + LOG.error("Unexpected exception in setup", e); + fail("Unexpected exception in setup"); + } + + //Start HBase cluster + hbaseTestUtil.startMiniCluster(3); + MiniDFSCluster x = hbaseTestUtil.getDFSCluster(); + + Class.forName(PhoenixDriver.class.getName()); + zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort(); + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); + conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum, props); + // Setup Hive Output Folder + + Statement stmt = conn.createStatement(); + stmt.execute("create table t(a integer primary key,b varchar)"); + } + + /** + * Create a table with two column, insert 1 row, check that phoenix table is created and + * the row is there + * + * @throws Exception + */ + @Test + public void simpleTest() throws Exception { + String testName = "simpleTest"; + // create a dummy outfile under log folder + hbaseTestUtil.getTestFileSystem().createNewFile(new Path(hiveLogDir, testName + ".out")); + createFile(StringUtil.EMPTY_STRING, new Path(hiveLogDir, testName + ".out").toString()); + createFile(StringUtil.EMPTY_STRING, new Path(hiveOutputDir, testName + ".out").toString()); + StringBuilder sb = new StringBuilder(); + sb.append("CREATE TABLE phoenix_table(ID STRING, SALARY STRING)" + HiveTestUtil.CRLF + + " STORED BY \"org.apache.phoenix.hive.PhoenixStorageHandler\"" + HiveTestUtil + .CRLF + " TBLPROPERTIES(" + HiveTestUtil.CRLF + + " 'phoenix.table.name'='phoenix_table'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.znode.parent'='hbase'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.quorum'='localhost:" + hbaseTestUtil.getZkCluster() + .getClientPort() + "', 'phoenix.rowkeys'='id');"); + sb.append("INSERT INTO TABLE phoenix_table" + HiveTestUtil.CRLF + + "VALUES ('10', '1000');" + HiveTestUtil.CRLF); + String fullPath = new Path(hbaseTestUtil.getDataTestDir(), testName).toString(); + createFile(sb.toString(), fullPath); + runTest(testName, fullPath); + + String phoenixQuery = "SELECT * FROM phoenix_table"; + PreparedStatement statement = conn.prepareStatement(phoenixQuery); + ResultSet rs = statement.executeQuery(); + assert (rs.getMetaData().getColumnCount() == 2); + assertTrue(rs.next()); + assert (rs.getString(1).equals("10")); + assert (rs.getString(2).equals("1000")); + + } + + /** + * Datatype Test + * + * @throws Exception + */ + @Test + public void dataTypeTest() throws Exception { + String testName = "dataTypeTest"; + // create a dummy outfile under log folder + hbaseTestUtil.getTestFileSystem().createNewFile(new Path(hiveLogDir, testName + ".out")); + createFile(StringUtil.EMPTY_STRING, new Path(hiveLogDir, testName + ".out").toString()); + createFile(StringUtil.EMPTY_STRING, new Path(hiveOutputDir, testName + ".out").toString()); + StringBuilder sb = new StringBuilder(); + sb.append("CREATE TABLE phoenix_datatype(ID int, description STRING, ts TIMESTAMP, db " + + "DOUBLE,fl FLOAT, us INT)" + HiveTestUtil.CRLF + + " STORED BY \"org.apache.phoenix.hive.PhoenixStorageHandler\"" + HiveTestUtil + .CRLF + " TBLPROPERTIES(" + HiveTestUtil.CRLF + + " 'phoenix.hbase.table.name'='phoenix_datatype'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.znode.parent'='hbase'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.quorum'='localhost:" + hbaseTestUtil.getZkCluster() + .getClientPort() + "'," + HiveTestUtil.CRLF + + " 'phoenix.rowkeys'='id');"); + sb.append("INSERT INTO TABLE phoenix_datatype" + HiveTestUtil.CRLF + + "VALUES (10, \"foodesc\",\"2013-01-05 01:01:01\",200,2.0,-1);" + HiveTestUtil.CRLF); + String fullPath = new Path(hbaseTestUtil.getDataTestDir(), testName).toString(); + createFile(sb.toString(), fullPath); + runTest(testName, fullPath); + + String phoenixQuery = "SELECT * FROM phoenix_datatype"; + PreparedStatement statement = conn.prepareStatement(phoenixQuery); + ResultSet rs = statement.executeQuery(); + assert (rs.getMetaData().getColumnCount() == 6); + while (rs.next()) { + assert (rs.getInt(1) == 10); + assert (rs.getString(2).equalsIgnoreCase("foodesc")); + /* Need a way how to correctly handle timestamp since Hive's implementation uses + time zone information but Phoenix doesn't. + */ + //assert(rs.getTimestamp(3).equals(Timestamp.valueOf("2013-01-05 02:01:01"))); + assert (rs.getDouble(4) == 200); + assert (rs.getFloat(5) == 2.0); + assert (rs.getInt(6) == -1); + } + } + + /** + * Datatype Test + * + * @throws Exception + */ + @Test + public void MultiKey() throws Exception { + String testName = "MultiKey"; + // create a dummy outfile under log folder + hbaseTestUtil.getTestFileSystem().createNewFile(new Path(hiveLogDir, testName + ".out")); + createFile(StringUtil.EMPTY_STRING, new Path(hiveLogDir, testName + ".out").toString()); + createFile(StringUtil.EMPTY_STRING, new Path(hiveOutputDir, testName + ".out").toString()); + StringBuilder sb = new StringBuilder(); + sb.append("CREATE TABLE phoenix_MultiKey(ID int, ID2 String,description STRING, ts " + + "TIMESTAMP, db DOUBLE,fl FLOAT, us INT)" + HiveTestUtil.CRLF + + " STORED BY \"org.apache.phoenix.hive.PhoenixStorageHandler\"" + HiveTestUtil + .CRLF + + " TBLPROPERTIES(" + HiveTestUtil.CRLF + + " 'phoenix.hbase.table.name'='phoenix_MultiKey'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.znode.parent'='hbase'," + HiveTestUtil.CRLF + + " 'phoenix.zookeeper.quorum'='localhost:" + hbaseTestUtil.getZkCluster() + .getClientPort() + "'," + HiveTestUtil.CRLF + + " 'phoenix.rowkeys'='id,id2');"); + sb.append("INSERT INTO TABLE phoenix_MultiKey" + HiveTestUtil.CRLF + + "VALUES (10, \"part2\",\"foodesc\",\"2013-01-05 01:01:01\",200,2.0,-1);" + + HiveTestUtil.CRLF); + String fullPath = new Path(hbaseTestUtil.getDataTestDir(), testName).toString(); + createFile(sb.toString(), fullPath); + runTest(testName, fullPath); + + String phoenixQuery = "SELECT * FROM phoenix_MultiKey"; + PreparedStatement statement = conn.prepareStatement(phoenixQuery); + ResultSet rs = statement.executeQuery(); + assert (rs.getMetaData().getColumnCount() == 7); + while (rs.next()) { + assert (rs.getInt(1) == 10); + assert (rs.getString(2).equalsIgnoreCase("part2")); + assert (rs.getString(3).equalsIgnoreCase("foodesc")); + //assert(rs.getTimestamp(4).equals(Timestamp.valueOf("2013-01-05 02:01:01"))); + assert (rs.getDouble(5) == 200); + assert (rs.getFloat(6) == 2.0); + assert (rs.getInt(7) == -1); + } + } + + + private void runTest(String fname, String fpath) throws Exception { + long startTime = System.currentTimeMillis(); + try { + LOG.info("Begin query: " + fname); + qt.addFile(fpath); + + if (qt.shouldBeSkipped(fname)) { + LOG.info("Test " + fname + " skipped"); + return; + } + + qt.cliInit(fname); + qt.clearTestSideEffects(); + int ecode = qt.executeClient(fname); + if (ecode != 0) { + qt.failed(ecode, fname, null); + } + + ecode = qt.checkCliDriverResults(fname); + if (ecode != 0) { + qt.failedDiff(ecode, fname, null); + } + qt.clearPostTestEffects(); + + } catch (Throwable e) { + qt.failed(e, fname, null); + } + + long elapsedTime = System.currentTimeMillis() - startTime; + LOG.info("Done query: " + fname + " elapsedTime=" + elapsedTime / 1000 + "s"); + assertTrue("Test passed", true); + } + + private void createFile(String content, String fullName) throws IOException { + FileUtils.write(new File(fullName), content); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + if (qt != null) { + try { + qt.shutdown(); + } catch (Exception e) { + LOG.error("Unexpected exception in setup", e); + fail("Unexpected exception in tearDown"); + } + } + try { + conn.close(); + } finally { + try { + PhoenixDriver.INSTANCE.close(); + } finally { + try { + DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); + } finally { + hbaseTestUtil.shutdownMiniCluster(); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/521b8192/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTestUtil.java b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTestUtil.java new file mode 100644 index 0000000..57722f8 --- /dev/null +++ b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTestUtil.java @@ -0,0 +1,1291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hive; + +import com.google.common.collect.ImmutableList; +import junit.framework.Assert; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hive.cli.CliDriver; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.common.io.CachingPrintStream; +import org.apache.hadoop.hive.common.io.DigestPrintStream; +import org.apache.hadoop.hive.common.io.SortAndDigestPrintStream; +import org.apache.hadoop.hive.common.io.SortPrintStream; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.ParseDriver; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.util.Shell; +import org.apache.hive.common.util.StreamPrinter; +import org.apache.tools.ant.BuildException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintStream; +import java.io.StringWriter; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Deque; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * HiveTestUtil cloned from Hive QTestUtil. Can be outdated and may require update once a problem + * found. + */ +public class HiveTestUtil { + + public static final String UTF_8 = "UTF-8"; + private static final Log LOG = LogFactory.getLog("HiveTestUtil"); + private static final String QTEST_LEAVE_FILES = "QTEST_LEAVE_FILES"; + public static final String DEFAULT_DATABASE_NAME = "default"; + + private String testWarehouse; + private final String testFiles; + protected final String outDir; + protected final String logDir; + private final TreeMap<String, String> qMap; + private final Set<String> qSkipSet; + private final Set<String> qSortSet; + private final Set<String> qSortQuerySet; + private final Set<String> qHashQuerySet; + private final Set<String> qSortNHashQuerySet; + private final Set<String> qJavaVersionSpecificOutput; + private static final String SORT_SUFFIX = ".sorted"; + private static MiniClusterType clusterType = MiniClusterType.none; + private ParseDriver pd; + protected Hive db; + protected HiveConf conf; + private BaseSemanticAnalyzer sem; + protected final boolean overWrite; + private CliDriver cliDriver; + private HadoopShims.MiniMrShim mr = null; + private HadoopShims.MiniDFSShim dfs = null; + private String hadoopVer = null; + private HiveTestSetup setup = null; + private boolean isSessionStateStarted = false; + private static final String javaVersion = getJavaVersion(); + + private String initScript = ""; + private String cleanupScript = ""; + + public HiveConf getConf() { + return conf; + } + + public boolean deleteDirectory(File path) { + if (path.exists()) { + File[] files = path.listFiles(); + for (File file : files) { + if (file.isDirectory()) { + deleteDirectory(file); + } else { + file.delete(); + } + } + } + return (path.delete()); + } + + public void copyDirectoryToLocal(Path src, Path dest) throws Exception { + + FileSystem srcFs = src.getFileSystem(conf); + FileSystem destFs = dest.getFileSystem(conf); + if (srcFs.exists(src)) { + FileStatus[] files = srcFs.listStatus(src); + for (FileStatus file : files) { + String name = file.getPath().getName(); + Path dfs_path = file.getPath(); + Path local_path = new Path(dest, name); + + if (file.isDir()) { + if (!destFs.exists(local_path)) { + destFs.mkdirs(local_path); + } + copyDirectoryToLocal(dfs_path, local_path); + } else { + srcFs.copyToLocalFile(dfs_path, local_path); + } + } + } + } + + static Pattern mapTok = Pattern.compile("(\\.?)(.*)_map_(.*)"); + static Pattern reduceTok = Pattern.compile("(.*)(reduce_[^\\.]*)((\\..*)?)"); + + public void normalizeNames(File path) throws Exception { + if (path.isDirectory()) { + File[] files = path.listFiles(); + for (File file : files) { + normalizeNames(file); + } + } else { + Matcher m = reduceTok.matcher(path.getName()); + if (m.matches()) { + String name = m.group(1) + "reduce" + m.group(3); + path.renameTo(new File(path.getParent(), name)); + } else { + m = mapTok.matcher(path.getName()); + if (m.matches()) { + String name = m.group(1) + "map_" + m.group(3); + path.renameTo(new File(path.getParent(), name)); + } + } + } + } + + public String getOutputDirectory() { + return outDir; + } + + public String getLogDirectory() { + return logDir; + } + + private String getHadoopMainVersion(String input) { + if (input == null) { + return null; + } + Pattern p = Pattern.compile("^(\\d+\\.\\d+).*"); + Matcher m = p.matcher(input); + if (m.matches()) { + return m.group(1); + } + return null; + } + + public void initConf() throws Exception { + // Plug verifying metastore in for testing. + conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, + "org.apache.hadoop.hive.metastore.VerifyingObjectStore"); + + if (mr != null) { + assert dfs != null; + + mr.setupConfiguration(conf); + + // set fs.default.name to the uri of mini-dfs + String dfsUriString = WindowsPathUtil.getHdfsUriString(dfs.getFileSystem().getUri() + .toString()); + conf.setVar(HiveConf.ConfVars.HADOOPFS, dfsUriString); + // hive.metastore.warehouse.dir needs to be set relative to the mini-dfs + conf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, + (new Path(dfsUriString, + "/build/ql/test/data/warehouse/")).toString()); + } + + // Windows paths should be converted after MiniMrShim.setupConfiguration() + // since setupConfiguration may overwrite configuration values. + if (Shell.WINDOWS) { + WindowsPathUtil.convertPathsFromWindowsToHdfs(conf); + } + } + + public enum MiniClusterType { + mr, + tez, + none; + + public static MiniClusterType valueForString(String type) { + if (type.equals("miniMR")) { + return mr; + } else if (type.equals("tez")) { + return tez; + } else { + return none; + } + } + } + + public HiveTestUtil(String outDir, String logDir, MiniClusterType clusterType, String hadoopVer) + throws Exception { + this(outDir, logDir, clusterType, null, hadoopVer); + } + + public HiveTestUtil(String outDir, String logDir, MiniClusterType clusterType, String confDir, + String hadoopVer) + throws Exception { + this.outDir = outDir; + this.logDir = logDir; + if (confDir != null && !confDir.isEmpty()) { + HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + + "/hive-site.xml")); + LOG.info("Setting hive-site: " + HiveConf.getHiveSiteLocation()); + } + conf = new HiveConf(); + String tmpBaseDir = System.getProperty("test.tmp.dir"); + if (tmpBaseDir == null || tmpBaseDir == "") { + tmpBaseDir = System.getProperty("java.io.tmpdir"); + } + String metaStoreURL = "jdbc:derby:" + tmpBaseDir + File.separator + "metastore_dbtest;" + + "create=true"; + conf.set(ConfVars.METASTORECONNECTURLKEY.varname, metaStoreURL); + System.setProperty(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, metaStoreURL); + + //set where derby logs + File derbyLogFile = new File(tmpBaseDir + "/derby.log"); + derbyLogFile.createNewFile(); + System.setProperty("derby.stream.error.file", derbyLogFile.getPath()); + + this.hadoopVer = getHadoopMainVersion(hadoopVer); + qMap = new TreeMap<String, String>(); + qSkipSet = new HashSet<String>(); + qSortSet = new HashSet<String>(); + qSortQuerySet = new HashSet<String>(); + qHashQuerySet = new HashSet<String>(); + qSortNHashQuerySet = new HashSet<String>(); + qJavaVersionSpecificOutput = new HashSet<String>(); + this.clusterType = clusterType; + + HadoopShims shims = ShimLoader.getHadoopShims(); + int numberOfDataNodes = 4; + + if (clusterType != MiniClusterType.none) { + dfs = shims.getMiniDfs(conf, numberOfDataNodes, true, null); + FileSystem fs = dfs.getFileSystem(); + String uriString = WindowsPathUtil.getHdfsUriString(fs.getUri().toString()); + if (clusterType == MiniClusterType.tez) { + mr = shims.getMiniTezCluster(conf, 4, uriString, 1); + } else { + mr = shims.getMiniMrCluster(conf, 4, uriString, 1); + } + } + + initConf(); + + // Use the current directory if it is not specified + String dataDir = conf.get("test.data.files"); + if (dataDir == null) { + dataDir = new File(".").getAbsolutePath() + "/data/files"; + } + + testFiles = dataDir; + + // Use the current directory if it is not specified + String scriptsDir = conf.get("test.data.scripts"); + if (scriptsDir == null) { + scriptsDir = new File(".").getAbsolutePath() + "/data/scripts"; + } + if (!initScript.isEmpty()) { + this.initScript = scriptsDir + "/" + initScript; + } + if (!cleanupScript.isEmpty()) { + this.cleanupScript = scriptsDir + "/" + cleanupScript; + } + + overWrite = "true".equalsIgnoreCase(System.getProperty("test.output.overwrite")); + + setup = new HiveTestSetup(); + setup.preTest(conf); + init(); + } + + public void shutdown() throws Exception { + cleanUp(); + setup.tearDown(); + if (mr != null) { + mr.shutdown(); + mr = null; + } + FileSystem.closeAll(); + if (dfs != null) { + dfs.shutdown(); + dfs = null; + } + } + + public String readEntireFileIntoString(File queryFile) throws IOException { + InputStreamReader isr = new InputStreamReader( + new BufferedInputStream(new FileInputStream(queryFile)), HiveTestUtil.UTF_8); + StringWriter sw = new StringWriter(); + try { + IOUtils.copy(isr, sw); + } finally { + if (isr != null) { + isr.close(); + } + } + return sw.toString(); + } + + public void addFile(String queryFile) throws IOException { + addFile(queryFile, false); + } + + public void addFile(String queryFile, boolean partial) throws IOException { + addFile(new File(queryFile)); + } + + public void addFile(File qf) throws IOException { + addFile(qf, false); + } + + public void addFile(File qf, boolean partial) throws IOException { + String query = readEntireFileIntoString(qf); + qMap.put(qf.getName(), query); + if (partial) return; + + if (matches(SORT_BEFORE_DIFF, query)) { + qSortSet.add(qf.getName()); + } else if (matches(SORT_QUERY_RESULTS, query)) { + qSortQuerySet.add(qf.getName()); + } else if (matches(HASH_QUERY_RESULTS, query)) { + qHashQuerySet.add(qf.getName()); + } else if (matches(SORT_AND_HASH_QUERY_RESULTS, query)) { + qSortNHashQuerySet.add(qf.getName()); + } + } + + private static final Pattern SORT_BEFORE_DIFF = Pattern.compile("-- SORT_BEFORE_DIFF"); + private static final Pattern SORT_QUERY_RESULTS = Pattern.compile("-- SORT_QUERY_RESULTS"); + private static final Pattern HASH_QUERY_RESULTS = Pattern.compile("-- HASH_QUERY_RESULTS"); + private static final Pattern SORT_AND_HASH_QUERY_RESULTS = Pattern.compile("-- " + + "SORT_AND_HASH_QUERY_RESULTS"); + + private boolean matches(Pattern pattern, String query) { + Matcher matcher = pattern.matcher(query); + if (matcher.find()) { + return true; + } + return false; + } + + /** + * Get formatted Java version to include minor version, but + * exclude patch level. + * + * @return Java version formatted as major_version.minor_version + */ + private static String getJavaVersion() { + String version = System.getProperty("java.version"); + if (version == null) { + throw new NullPointerException("No java version could be determined " + + "from system properties"); + } + + // "java version" system property is formatted + // major_version.minor_version.patch_level. + // Find second dot, instead of last dot, to be safe + int pos = version.indexOf('.'); + pos = version.indexOf('.', pos + 1); + return version.substring(0, pos); + } + + /** + * Clear out any side effects of running tests + */ + public void clearPostTestEffects() throws Exception { + setup.postTest(conf); + } + + /** + * Clear out any side effects of running tests + */ + public void clearTablesCreatedDuringTests() throws Exception { + if (System.getenv(QTEST_LEAVE_FILES) != null) { + return; + } + + // Delete any tables other than the source tables + // and any databases other than the default database. + for (String dbName : db.getAllDatabases()) { + SessionState.get().setCurrentDatabase(dbName); + for (String tblName : db.getAllTables()) { + if (!DEFAULT_DATABASE_NAME.equals(dbName)) { + Table tblObj = db.getTable(tblName); + // dropping index table can not be dropped directly. Dropping the base + // table will automatically drop all its index table + if (tblObj.isIndexTable()) { + continue; + } + db.dropTable(dbName, tblName); + } else { + // this table is defined in srcTables, drop all indexes on it + List<Index> indexes = db.getIndexes(dbName, tblName, (short) -1); + if (indexes != null && indexes.size() > 0) { + for (Index index : indexes) { + db.dropIndex(dbName, tblName, index.getIndexName(), true, true); + } + } + } + } + if (!DEFAULT_DATABASE_NAME.equals(dbName)) { + // Drop cascade, may need to drop functions + db.dropDatabase(dbName, true, true, true); + } + } + + // delete remaining directories for external tables (can affect stats for following tests) + try { + Path p = new Path(testWarehouse); + FileSystem fileSystem = p.getFileSystem(conf); + if (fileSystem.exists(p)) { + for (FileStatus status : fileSystem.listStatus(p)) { + if (status.isDir()) { + fileSystem.delete(status.getPath(), true); + } + } + } + } catch (IllegalArgumentException e) { + // ignore.. provides invalid url sometimes intentionally + } + SessionState.get().setCurrentDatabase(DEFAULT_DATABASE_NAME); + + List<String> roleNames = db.getAllRoleNames(); + for (String roleName : roleNames) { + if (!"PUBLIC".equalsIgnoreCase(roleName) && !"ADMIN".equalsIgnoreCase(roleName)) { + db.dropRole(roleName); + } + } + } + + /** + * Clear out any side effects of running tests + */ + public void clearTestSideEffects() throws Exception { + if (System.getenv(QTEST_LEAVE_FILES) != null) { + return; + } + + clearTablesCreatedDuringTests(); + } + + public void cleanUp() throws Exception { + if (!isSessionStateStarted) { + startSessionState(); + } + if (System.getenv(QTEST_LEAVE_FILES) != null) { + return; + } + + clearTablesCreatedDuringTests(); + + SessionState.get().getConf().setBoolean("hive.test.shutdown.phase", true); + + if (cleanupScript != "") { + String cleanupCommands = readEntireFileIntoString(new File(cleanupScript)); + LOG.info("Cleanup (" + cleanupScript + "):\n" + cleanupCommands); + if (cliDriver == null) { + cliDriver = new CliDriver(); + } + cliDriver.processLine(cleanupCommands); + } + + SessionState.get().getConf().setBoolean("hive.test.shutdown.phase", false); + + // delete any contents in the warehouse dir + Path p = new Path(testWarehouse); + FileSystem fs = p.getFileSystem(conf); + + try { + FileStatus[] ls = fs.listStatus(p); + for (int i = 0; (ls != null) && (i < ls.length); i++) { + fs.delete(ls[i].getPath(), true); + } + } catch (FileNotFoundException e) { + // Best effort + } + + FunctionRegistry.unregisterTemporaryUDF("test_udaf"); + FunctionRegistry.unregisterTemporaryUDF("test_error"); + } + + public void createSources() throws Exception { + if (!isSessionStateStarted) { + startSessionState(); + } + conf.setBoolean("hive.test.init.phase", true); + + if (cliDriver == null) { + cliDriver = new CliDriver(); + } + cliDriver.processLine("set test.data.dir=" + testFiles + ";"); + + conf.setBoolean("hive.test.init.phase", false); + } + + public void init() throws Exception { + testWarehouse = conf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE); + String execEngine = conf.get("hive.execution.engine"); + conf.set("hive.execution.engine", "mr"); + SessionState.start(conf); + conf.set("hive.execution.engine", execEngine); + db = Hive.get(conf); + pd = new ParseDriver(); + sem = new SemanticAnalyzer(conf); + } + + public void init(String tname) throws Exception { + cleanUp(); + createSources(); + cliDriver.processCmd("set hive.cli.print.header=true;"); + } + + public void cliInit(String tname) throws Exception { + cliInit(tname, true); + } + + public String cliInit(String tname, boolean recreate) throws Exception { + if (recreate) { + cleanUp(); + createSources(); + } + + HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER, + "org.apache.hadoop.hive.ql.security.HadoopDefaultAuthenticator"); + Utilities.clearWorkMap(); + CliSessionState ss = new CliSessionState(conf); + assert ss != null; + ss.in = System.in; + + String outFileExtension = getOutFileExtension(tname); + String stdoutName = null; + if (outDir != null) { + File qf = new File(outDir, tname); + stdoutName = qf.getName().concat(outFileExtension); + } else { + stdoutName = tname + outFileExtension; + } + + File outf = new File(logDir, stdoutName); + OutputStream fo = new BufferedOutputStream(new FileOutputStream(outf)); + if (qSortQuerySet.contains(tname)) { + ss.out = new SortPrintStream(fo, "UTF-8"); + } else if (qHashQuerySet.contains(tname)) { + ss.out = new DigestPrintStream(fo, "UTF-8"); + } else if (qSortNHashQuerySet.contains(tname)) { + ss.out = new SortAndDigestPrintStream(fo, "UTF-8"); + } else { + ss.out = new PrintStream(fo, true, "UTF-8"); + } + ss.err = new CachingPrintStream(fo, true, "UTF-8"); + ss.setIsSilent(true); + SessionState oldSs = SessionState.get(); + + if (oldSs != null && clusterType == MiniClusterType.tez) { + oldSs.close(); + } + + if (oldSs != null && oldSs.out != null && oldSs.out != System.out) { + oldSs.out.close(); + } + SessionState.start(ss); + + cliDriver = new CliDriver(); + cliDriver.processInitFiles(ss); + + return outf.getAbsolutePath(); + } + + private CliSessionState startSessionState() + throws IOException { + + HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER, + "org.apache.hadoop.hive.ql.security.HadoopDefaultAuthenticator"); + + String execEngine = conf.get("hive.execution.engine"); + conf.set("hive.execution.engine", "mr"); + CliSessionState ss = new CliSessionState(conf); + assert ss != null; + ss.in = System.in; + ss.out = System.out; + ss.err = System.out; + + SessionState oldSs = SessionState.get(); + if (oldSs != null && clusterType == MiniClusterType.tez) { + oldSs.close(); + } + if (oldSs != null && oldSs.out != null && oldSs.out != System.out) { + oldSs.out.close(); + } + SessionState.start(ss); + + isSessionStateStarted = true; + + conf.set("hive.execution.engine", execEngine); + return ss; + } + + public int executeOne(String tname) { + String q = qMap.get(tname); + + if (q.indexOf(";") == -1) { + return -1; + } + + String q1 = q.substring(0, q.indexOf(";") + 1); + String qrest = q.substring(q.indexOf(";") + 1); + qMap.put(tname, qrest); + + LOG.info("Executing " + q1); + return cliDriver.processLine(q1); + } + + public static final String CRLF = System.getProperty("line.separator"); + + public int executeClient(String tname1, String tname2) { + String commands = getCommands(tname1) + CRLF + getCommands(tname2); + return cliDriver.processLine(commands); + } + + public int executeClient(String tname) { + return cliDriver.processLine(getCommands(tname), false); + } + + private String getCommands(String tname) { + String commands = qMap.get(tname); + StringBuilder newCommands = new StringBuilder(commands.length()); + int lastMatchEnd = 0; + Matcher commentMatcher = Pattern.compile("^--.*$", Pattern.MULTILINE).matcher(commands); + while (commentMatcher.find()) { + newCommands.append(commands.substring(lastMatchEnd, commentMatcher.start())); + newCommands.append(commentMatcher.group().replaceAll("(?<!\\\\);", "\\\\;")); + lastMatchEnd = commentMatcher.end(); + } + newCommands.append(commands.substring(lastMatchEnd, commands.length())); + commands = newCommands.toString(); + return commands; + } + + public boolean shouldBeSkipped(String tname) { + return qSkipSet.contains(tname); + } + + private String getOutFileExtension(String fname) { + String outFileExtension = ".out"; + if (qJavaVersionSpecificOutput.contains(fname)) { + outFileExtension = ".java" + javaVersion + ".out"; + } + + return outFileExtension; + } + + /** + * Given the current configurations (e.g., hadoop version and execution mode), return + * the correct file name to compare with the current test run output. + * + * @param outDir The directory where the reference log files are stored. + * @param testName The test file name (terminated by ".out"). + * @return The file name appended with the configuration values if it exists. + */ + public String outPath(String outDir, String testName) { + String ret = (new File(outDir, testName)).getPath(); + // List of configurations. Currently the list consists of hadoop version and execution + // mode only + List<String> configs = new ArrayList<String>(); + configs.add(this.hadoopVer); + + Deque<String> stack = new LinkedList<String>(); + StringBuilder sb = new StringBuilder(); + sb.append(testName); + stack.push(sb.toString()); + + // example file names are input1.q.out_0.20.0_minimr or input2.q.out_0.17 + for (String s : configs) { + sb.append('_'); + sb.append(s); + stack.push(sb.toString()); + } + while (stack.size() > 0) { + String fileName = stack.pop(); + File f = new File(outDir, fileName); + if (f.exists()) { + ret = f.getPath(); + break; + } + } + return ret; + } + + private Pattern[] toPattern(String[] patternStrs) { + Pattern[] patterns = new Pattern[patternStrs.length]; + for (int i = 0; i < patternStrs.length; i++) { + patterns[i] = Pattern.compile(patternStrs[i]); + } + return patterns; + } + + private void maskPatterns(Pattern[] patterns, String fname) throws Exception { + String maskPattern = "#### A masked pattern was here ####"; + + String line; + BufferedReader in; + BufferedWriter out; + + File file = new File(fname); + File fileOrig = new File(fname + ".orig"); + FileUtils.copyFile(file, fileOrig); + + in = new BufferedReader(new InputStreamReader(new FileInputStream(fileOrig), "UTF-8")); + out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), "UTF-8")); + + boolean lastWasMasked = false; + while (null != (line = in.readLine())) { + for (Pattern pattern : patterns) { + line = pattern.matcher(line).replaceAll(maskPattern); + } + + if (line.equals(maskPattern)) { + // We're folding multiple masked lines into one. + if (!lastWasMasked) { + out.write(line); + out.write("\n"); + lastWasMasked = true; + } + } else { + out.write(line); + out.write("\n"); + lastWasMasked = false; + } + } + + in.close(); + out.close(); + } + + private final Pattern[] planMask = toPattern(new String[]{ + ".*file:.*", + ".*pfile:.*", + ".*hdfs:.*", + ".*/tmp/.*", + ".*invalidscheme:.*", + ".*lastUpdateTime.*", + ".*lastAccessTime.*", + ".*lastModifiedTime.*", + ".*[Oo]wner.*", + ".*CreateTime.*", + ".*LastAccessTime.*", + ".*Location.*", + ".*LOCATION '.*", + ".*transient_lastDdlTime.*", + ".*last_modified_.*", + ".*at org.*", + ".*at sun.*", + ".*at java.*", + ".*at junit.*", + ".*Caused by:.*", + ".*LOCK_QUERYID:.*", + ".*LOCK_TIME:.*", + ".*grantTime.*", + ".*[.][.][.] [0-9]* more.*", + ".*job_[0-9_]*.*", + ".*job_local[0-9_]*.*", + ".*USING 'java -cp.*", + "^Deleted.*", + ".*DagName:.*", + ".*Input:.*/data/files/.*", + ".*Output:.*/data/files/.*", + ".*total number of created files now is.*" + }); + + public int checkCliDriverResults(String tname) throws Exception { + assert (qMap.containsKey(tname)); + + String outFileExtension = getOutFileExtension(tname); + String outFileName = outPath(outDir, tname + outFileExtension); + + File f = new File(logDir, tname + outFileExtension); + + maskPatterns(planMask, f.getPath()); + int exitVal = executeDiffCommand(f.getPath(), + outFileName, false, + qSortSet.contains(tname)); + + if (exitVal != 0 && overWrite) { + exitVal = overwriteResults(f.getPath(), outFileName); + } + + return exitVal; + } + + + public int checkCompareCliDriverResults(String tname, List<String> outputs) throws Exception { + assert outputs.size() > 1; + maskPatterns(planMask, outputs.get(0)); + for (int i = 1; i < outputs.size(); ++i) { + maskPatterns(planMask, outputs.get(i)); + int ecode = executeDiffCommand( + outputs.get(i - 1), outputs.get(i), false, qSortSet.contains(tname)); + if (ecode != 0) { + LOG.info("Files don't match: " + outputs.get(i - 1) + " and " + outputs.get(i)); + return ecode; + } + } + return 0; + } + + private static int overwriteResults(String inFileName, String outFileName) throws Exception { + // This method can be replaced with Files.copy(source, target, REPLACE_EXISTING) + // once Hive uses JAVA 7. + LOG.info("Overwriting results " + inFileName + " to " + outFileName); + return executeCmd(new String[]{ + "cp", + getQuotedString(inFileName), + getQuotedString(outFileName) + }); + } + + private static int executeDiffCommand(String inFileName, + String outFileName, + boolean ignoreWhiteSpace, + boolean sortResults + ) throws Exception { + + int result = 0; + + if (sortResults) { + // sort will try to open the output file in write mode on windows. We need to + // close it first. + SessionState ss = SessionState.get(); + if (ss != null && ss.out != null && ss.out != System.out) { + ss.out.close(); + } + + String inSorted = inFileName + SORT_SUFFIX; + String outSorted = outFileName + SORT_SUFFIX; + + result = sortFiles(inFileName, inSorted); + result |= sortFiles(outFileName, outSorted); + if (result != 0) { + LOG.error("ERROR: Could not sort files before comparing"); + return result; + } + inFileName = inSorted; + outFileName = outSorted; + } + + ArrayList<String> diffCommandArgs = new ArrayList<String>(); + diffCommandArgs.add("diff"); + + // Text file comparison + diffCommandArgs.add("-a"); + + // Ignore changes in the amount of white space + if (ignoreWhiteSpace || Shell.WINDOWS) { + diffCommandArgs.add("-b"); + } + + // Files created on Windows machines have different line endings + // than files created on Unix/Linux. Windows uses carriage return and line feed + // ("\r\n") as a line ending, whereas Unix uses just line feed ("\n"). + // Also StringBuilder.toString(), Stream to String conversions adds extra + // spaces at the end of the line. + if (Shell.WINDOWS) { + diffCommandArgs.add("--strip-trailing-cr"); // Strip trailing carriage return on input + diffCommandArgs.add("-B"); // Ignore changes whose lines are all blank + } + // Add files to compare to the arguments list + diffCommandArgs.add(getQuotedString(inFileName)); + diffCommandArgs.add(getQuotedString(outFileName)); + + result = executeCmd(diffCommandArgs); + + if (sortResults) { + new File(inFileName).delete(); + new File(outFileName).delete(); + } + + return result; + } + + private static int sortFiles(String in, String out) throws Exception { + return executeCmd(new String[]{ + "sort", + getQuotedString(in), + }, out, null); + } + + private static int executeCmd(Collection<String> args) throws Exception { + return executeCmd(args, null, null); + } + + private static int executeCmd(String[] args) throws Exception { + return executeCmd(args, null, null); + } + + private static int executeCmd(Collection<String> args, String outFile, String errFile) throws + Exception { + String[] cmdArray = args.toArray(new String[args.size()]); + return executeCmd(cmdArray, outFile, errFile); + } + + private static int executeCmd(String[] args, String outFile, String errFile) throws Exception { + LOG.info("Running: " + org.apache.commons.lang.StringUtils.join(args, ' ')); + + PrintStream out = outFile == null ? + SessionState.getConsole().getChildOutStream() : + new PrintStream(new FileOutputStream(outFile), true); + PrintStream err = errFile == null ? + SessionState.getConsole().getChildErrStream() : + new PrintStream(new FileOutputStream(errFile), true); + + Process executor = Runtime.getRuntime().exec(args); + + StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, err); + StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, out); + + outPrinter.start(); + errPrinter.start(); + + int result = executor.waitFor(); + + outPrinter.join(); + errPrinter.join(); + + if (outFile != null) { + out.close(); + } + + if (errFile != null) { + err.close(); + } + + return result; + } + + private static String getQuotedString(String str) { + return Shell.WINDOWS ? String.format("\"%s\"", str) : str; + } + + public ASTNode parseQuery(String tname) throws Exception { + return pd.parse(qMap.get(tname)); + } + + public void resetParser() throws SemanticException { + pd = new ParseDriver(); + sem = new SemanticAnalyzer(conf); + } + + public TreeMap<String, String> getQMap() { + return qMap; + } + + /** + * HiveTestSetup defines test fixtures which are reused across testcases, + * and are needed before any test can be run + */ + public static class HiveTestSetup { + private MiniZooKeeperCluster zooKeeperCluster = null; + private int zkPort; + private ZooKeeper zooKeeper; + + public HiveTestSetup() { + } + + public void preTest(HiveConf conf) throws Exception { + + if (zooKeeperCluster == null) { + //create temp dir + String tmpBaseDir = System.getProperty("test.tmp.dir"); + File tmpDir = Utilities.createTempDir(tmpBaseDir); + + zooKeeperCluster = new MiniZooKeeperCluster(); + zkPort = zooKeeperCluster.startup(tmpDir); + } + + if (zooKeeper != null) { + zooKeeper.close(); + } + + int sessionTimeout = (int) conf.getTimeVar(HiveConf.ConfVars + .HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); + zooKeeper = new ZooKeeper("localhost:" + zkPort, sessionTimeout, new Watcher() { + @Override + public void process(WatchedEvent arg0) { + } + }); + + String zkServer = "localhost"; + conf.set("hive.zookeeper.quorum", zkServer); + conf.set("hive.zookeeper.client.port", "" + zkPort); + } + + public void postTest(HiveConf conf) throws Exception { + if (zooKeeperCluster == null) { + return; + } + + if (zooKeeper != null) { + zooKeeper.close(); + } + + ZooKeeperHiveLockManager.releaseAllLocks(conf); + } + + public void tearDown() throws Exception { + if (zooKeeperCluster != null) { + zooKeeperCluster.shutdown(); + zooKeeperCluster = null; + } + } + } + + /** + * QTRunner: Runnable class for running a a single query file. + **/ + public static class HiveTestRunner implements Runnable { + private final HiveTestUtil qt; + private final String fname; + + public HiveTestRunner(HiveTestUtil qt, String fname) { + this.qt = qt; + this.fname = fname; + } + + @Override + public void run() { + try { + // assumption is that environment has already been cleaned once globally + // hence each thread does not call cleanUp() and createSources() again + qt.cliInit(fname, false); + qt.executeClient(fname); + } catch (Throwable e) { + LOG.error("Query file " + fname + " failed with exception ", e); + e.printStackTrace(); + outputTestFailureHelpMessage(); + } + } + } + + /** + * Setup to execute a set of query files. Uses HiveTestUtil to do so. + * + * @param qfiles array of input query files containing arbitrary number of hive + * queries + * @param resDir output directory + * @param logDir log directory + * @return one HiveTestUtil for each query file + */ + public static HiveTestUtil[] queryListRunnerSetup(File[] qfiles, String resDir, + String logDir) throws Exception { + HiveTestUtil[] qt = new HiveTestUtil[qfiles.length]; + for (int i = 0; i < qfiles.length; i++) { + qt[i] = new HiveTestUtil(resDir, logDir, MiniClusterType.mr, null, "0.20"); + qt[i].addFile(qfiles[i]); + qt[i].clearTestSideEffects(); + } + + return qt; + } + + /** + * Executes a set of query files in sequence. + * + * @param qfiles array of input query files containing arbitrary number of hive + * queries + * @param qt array of HiveTestUtils, one per qfile + * @return true if all queries passed, false otw + */ + public static boolean queryListRunnerSingleThreaded(File[] qfiles, HiveTestUtil[] qt) + throws Exception { + boolean failed = false; + qt[0].cleanUp(); + qt[0].createSources(); + for (int i = 0; i < qfiles.length && !failed; i++) { + qt[i].clearTestSideEffects(); + qt[i].cliInit(qfiles[i].getName(), false); + qt[i].executeClient(qfiles[i].getName()); + int ecode = qt[i].checkCliDriverResults(qfiles[i].getName()); + if (ecode != 0) { + failed = true; + LOG.error("Test " + qfiles[i].getName() + + " results check failed with error code " + ecode); + outputTestFailureHelpMessage(); + } + qt[i].clearPostTestEffects(); + } + return (!failed); + } + + public static void outputTestFailureHelpMessage() { + LOG.error("See ./ql/target/tmp/log/hive.log or ./itests/qtest/target/tmp/log/hive.log, " + + "or check ./ql/target/surefire-reports or " + + "./itests/qtest/target/surefire-reports/ for specific test cases logs."); + } + + public static String ensurePathEndsInSlash(String path) { + if (path == null) { + throw new NullPointerException("Path cannot be null"); + } + if (path.endsWith(File.separator)) { + return path; + } else { + return path + File.separator; + } + } + + private static String[] cachedQvFileList = null; + private static ImmutableList<String> cachedDefaultQvFileList = null; + private static Pattern qvSuffix = Pattern.compile("_[0-9]+.qv$", Pattern.CASE_INSENSITIVE); + + public static List<String> getVersionFiles(String queryDir, String tname) { + ensureQvFileList(queryDir); + List<String> result = getVersionFilesInternal(tname); + if (result == null) { + result = cachedDefaultQvFileList; + } + return result; + } + + private static void ensureQvFileList(String queryDir) { + if (cachedQvFileList != null) return; + // Not thread-safe. + LOG.info("Getting versions from " + queryDir); + cachedQvFileList = (new File(queryDir)).list(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.toLowerCase().endsWith(".qv"); + } + }); + if (cachedQvFileList == null) return; // no files at all + Arrays.sort(cachedQvFileList, String.CASE_INSENSITIVE_ORDER); + List<String> defaults = getVersionFilesInternal("default"); + cachedDefaultQvFileList = (defaults != null) + ? ImmutableList.copyOf(defaults) : ImmutableList.<String>of(); + } + + private static List<String> getVersionFilesInternal(String tname) { + if (cachedQvFileList == null) { + return new ArrayList<String>(); + } + int pos = Arrays.binarySearch(cachedQvFileList, tname, String.CASE_INSENSITIVE_ORDER); + if (pos >= 0) { + throw new BuildException("Unexpected file list element: " + cachedQvFileList[pos]); + } + List<String> result = null; + for (pos = (-pos - 1); pos < cachedQvFileList.length; ++pos) { + String candidate = cachedQvFileList[pos]; + if (candidate.length() <= tname.length() + || !tname.equalsIgnoreCase(candidate.substring(0, tname.length())) + || !qvSuffix.matcher(candidate.substring(tname.length())).matches()) { + break; + } + if (result == null) { + result = new ArrayList<String>(); + } + result.add(candidate); + } + return result; + } + + public void failed(int ecode, String fname, String debugHint) { + String command = SessionState.get() != null ? SessionState.get().getLastCommand() : null; + Assert.fail("Client Execution failed with error code = " + ecode + + (command != null ? " running " + command : "") + (debugHint != null ? debugHint : + "")); + } + + // for negative tests, which is succeeded.. no need to print the query string + public void failed(String fname, String debugHint) { + Assert.fail("Client Execution was expected to fail, but succeeded with error code 0 " + + (debugHint != null ? debugHint : "")); + } + + public void failedDiff(int ecode, String fname, String debugHint) { + Assert.fail("Client Execution results failed with error code = " + ecode + + (debugHint != null ? debugHint : "")); + } + + public void failed(Throwable e, String fname, String debugHint) { + String command = SessionState.get() != null ? SessionState.get().getLastCommand() : null; + LOG.error("Exception: ", e); + e.printStackTrace(); + LOG.error("Failed query: " + fname); + Assert.fail("Unexpected exception " + + org.apache.hadoop.util.StringUtils.stringifyException(e) + "\n" + + (command != null ? " running " + command : "") + + (debugHint != null ? debugHint : "")); + } + + public static class WindowsPathUtil { + + public static void convertPathsFromWindowsToHdfs(HiveConf conf) { + // Following local paths are used as HDFS paths in unit tests. + // It works well in Unix as the path notation in Unix and HDFS is more or less same. + // But when it comes to Windows, drive letter separator ':' & backslash '\" are invalid + // characters in HDFS so we need to converts these local paths to HDFS paths before + // using them + // in unit tests. + + String orgWarehouseDir = conf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE); + conf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, getHdfsUriString(orgWarehouseDir)); + + String orgTestTempDir = System.getProperty("test.tmp.dir"); + System.setProperty("test.tmp.dir", getHdfsUriString(orgTestTempDir)); + + String orgTestWarehouseDir = System.getProperty("test.warehouse.dir"); + System.setProperty("test.warehouse.dir", getHdfsUriString(orgTestWarehouseDir)); + + String orgScratchDir = conf.getVar(HiveConf.ConfVars.SCRATCHDIR); + conf.setVar(HiveConf.ConfVars.SCRATCHDIR, getHdfsUriString(orgScratchDir)); + } + + public static String getHdfsUriString(String uriStr) { + assert uriStr != null; + if (Shell.WINDOWS) { + // If the URI conversion is from Windows to HDFS then replace the '\' with '/' + // and remove the windows single drive letter & colon from absolute path. + return uriStr.replace('\\', '/') + .replaceFirst("/[c-zC-Z]:", "/") + .replaceFirst("^[c-zC-Z]:", ""); + } + return uriStr; + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/521b8192/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixMetaHook.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixMetaHook.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixMetaHook.java new file mode 100644 index 0000000..d920517 --- /dev/null +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixMetaHook.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hive; + +import com.google.common.base.CharMatcher; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants; +import org.apache.phoenix.hive.util.PhoenixConnectionUtil; +import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil; +import org.apache.phoenix.hive.util.PhoenixUtil; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * Implementation for notification methods which are invoked as part of transactions against the + * hive metastore,allowing Phoenix metadata to be kept in sync with Hive'smetastore. + */ +public class PhoenixMetaHook implements HiveMetaHook { + + private static final Log LOG = LogFactory.getLog(PhoenixMetaHook.class); + + @Override + public void preCreateTable(Table table) throws MetaException { + if (LOG.isDebugEnabled()) { + LOG.debug("Precreate table : " + table.getTableName()); + } + + try (Connection conn = PhoenixConnectionUtil.getConnection(table)) { + String tableType = table.getTableType(); + String tableName = PhoenixStorageHandlerUtil.getTargetTableName(table); + + if (TableType.EXTERNAL_TABLE.name().equals(tableType)) { + // Check whether phoenix table exists. + if (!PhoenixUtil.existTable(conn, tableName)) { + // Error if phoenix table not exist. + throw new MetaException("Phoenix table " + tableName + " doesn't exist"); + } + } else if (TableType.MANAGED_TABLE.name().equals(tableType)) { + // Check whether phoenix table exists. + if (PhoenixUtil.existTable(conn, tableName)) { + // Error if phoenix table already exist. + throw new MetaException("Phoenix table " + tableName + " already exist."); + } + + PhoenixUtil.createTable(conn, createTableStatement(table)); + } else { + throw new MetaException("Unsupported table Type: " + table.getTableType()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Phoenix table " + tableName + " was created"); + } + } catch (SQLException e) { + throw new MetaException(e.getMessage()); + } + } + + private String createTableStatement(Table table) throws MetaException { + Map<String, String> tableParameterMap = table.getParameters(); + + String tableName = PhoenixStorageHandlerUtil.getTargetTableName(table); + StringBuilder ddl = new StringBuilder("create table ").append(tableName).append(" (\n"); + + String phoenixRowKeys = tableParameterMap.get(PhoenixStorageHandlerConstants + .PHOENIX_ROWKEYS); + StringBuilder realRowKeys = new StringBuilder(); + List<String> phoenixRowKeyList = Lists.newArrayList(Splitter.on + (PhoenixStorageHandlerConstants.COMMA).trimResults().split(phoenixRowKeys)); + Map<String, String> columnMappingMap = getColumnMappingMap(tableParameterMap.get + (PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING)); + + List<FieldSchema> fieldSchemaList = table.getSd().getCols(); + for (int i = 0, limit = fieldSchemaList.size(); i < limit; i++) { + FieldSchema fieldSchema = fieldSchemaList.get(i); + String fieldName = fieldSchema.getName(); + String fieldType = fieldSchema.getType(); + String columnType = PhoenixUtil.getPhoenixType(fieldType); + + String rowKeyName = getRowKeyMapping(fieldName, phoenixRowKeyList); + if (rowKeyName != null) { + // In case of RowKey + if ("binary".equals(columnType)) { + // Phoenix must define max length of binary when type definition. Obtaining + // information from the column mapping. ex) phoenix.rowkeys = "r1, r2(100), ..." + List<String> tokenList = Lists.newArrayList(Splitter.on(CharMatcher.is('(') + .or(CharMatcher.is(')'))).trimResults().split(rowKeyName)); + columnType = columnType + "(" + tokenList.get(1) + ")"; + rowKeyName = tokenList.get(0); + } + + ddl.append(" ").append(rowKeyName).append(" ").append(columnType).append(" not " + + "null,\n"); + realRowKeys.append(rowKeyName).append(","); + } else { + // In case of Column + String columnName = columnMappingMap.get(fieldName); + + if (columnName == null) { + // Use field definition. + columnName = fieldName; + } + + if ("binary".equals(columnType)) { + // Phoenix must define max length of binary when type definition. Obtaining + // information from the column mapping. ex) phoenix.column.mapping=c1:c1(100) + List<String> tokenList = Lists.newArrayList(Splitter.on(CharMatcher.is('(') + .or(CharMatcher.is(')'))).trimResults().split(columnName)); + columnType = columnType + "(" + tokenList.get(1) + ")"; + columnName = tokenList.get(0); + } + + ddl.append(" ").append(columnName).append(" ").append(columnType).append(",\n"); + } + } + ddl.append(" ").append("constraint pk_").append(tableName).append(" primary key(") + .append(realRowKeys.deleteCharAt(realRowKeys.length() - 1)).append(")\n)\n"); + + String tableOptions = tableParameterMap.get(PhoenixStorageHandlerConstants + .PHOENIX_TABLE_OPTIONS); + if (tableOptions != null) { + ddl.append(tableOptions); + } + + String statement = ddl.toString(); + + if (LOG.isDebugEnabled()) { + LOG.debug("DDL : " + statement); + } + + return statement; + } + + private String getRowKeyMapping(String rowKeyName, List<String> phoenixRowKeyList) { + String rowKeyMapping = null; + + for (String phoenixRowKey : phoenixRowKeyList) { + if (phoenixRowKey.equals(rowKeyName)) { + rowKeyMapping = phoenixRowKey; + break; + } else if (phoenixRowKey.startsWith(rowKeyName + "(") && phoenixRowKey.endsWith(")")) { + rowKeyMapping = phoenixRowKey; + break; + } + } + + return rowKeyMapping; + } + + private Map<String, String> getColumnMappingMap(String columnMappings) { + if (LOG.isDebugEnabled()) { + LOG.debug("Column mappings : " + columnMappings); + } + + if (columnMappings == null) { + if (LOG.isInfoEnabled()) { + LOG.info("phoenix.column.mapping not set. using field definition"); + } + + return Collections.emptyMap(); + } + + Map<String, String> columnMappingMap = Splitter.on(PhoenixStorageHandlerConstants.COMMA) + .trimResults().withKeyValueSeparator(PhoenixStorageHandlerConstants.COLON).split + (columnMappings); + + if (LOG.isDebugEnabled()) { + LOG.debug("Column mapping map : " + columnMappingMap); + } + + return columnMappingMap; + } + + @Override + public void rollbackCreateTable(Table table) throws MetaException { + if (LOG.isDebugEnabled()) { + LOG.debug("Rollback for table : " + table.getTableName()); + } + + dropTableIfExist(table); + } + + @Override + public void commitCreateTable(Table table) throws MetaException { + + } + + @Override + public void preDropTable(Table table) throws MetaException { + } + + @Override + public void rollbackDropTable(Table table) throws MetaException { + } + + @Override + public void commitDropTable(Table table, boolean deleteData) throws MetaException { + if (LOG.isDebugEnabled()) { + LOG.debug("Commit drop table : " + table.getTableName()); + } + + dropTableIfExist(table); + } + + private void dropTableIfExist(Table table) throws MetaException { + try (Connection conn = PhoenixConnectionUtil.getConnection(table)) { + String tableType = table.getTableType(); + String tableName = PhoenixStorageHandlerUtil.getTargetTableName(table); + + if (TableType.MANAGED_TABLE.name().equals(tableType)) { + // Drop if phoenix table exist. + if (PhoenixUtil.existTable(conn, tableName)) { + PhoenixUtil.dropTable(conn, tableName); + } + } + } catch (SQLException e) { + throw new MetaException(e.getMessage()); + } + } +}