HIVE-17561 Move TxnStore and implementations to standalone metastore (Alan Gates, reviewed by Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f4a12a56 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f4a12a56 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f4a12a56 Branch: refs/heads/hive-14535 Commit: f4a12a56122a9b72dd60852617cca4e8c9d84cc9 Parents: c1f3d9a Author: Alan Gates <[email protected]> Authored: Fri Oct 6 09:56:44 2017 -0700 Committer: Alan Gates <[email protected]> Committed: Fri Oct 6 09:56:44 2017 -0700 ---------------------------------------------------------------------- .../hive/common/ValidCompactorTxnList.java | 89 - .../hadoop/hive/common/ValidReadTxnList.java | 237 -- .../apache/hadoop/hive/common/ValidTxnList.java | 112 - .../hive/common/TestValidReadTxnList.java | 109 - .../hive/hcatalog/streaming/TestStreaming.java | 4 +- .../streaming/mutate/StreamingTestUtils.java | 4 +- .../hive/metastore/TestHiveMetaStoreTxns.java | 4 +- .../apache/hadoop/hive/ql/TestAcidOnTez.java | 5 +- .../hive/ql/txn/compactor/TestCompactor.java | 4 +- .../hadoop/hive/metastore/HiveMetaStore.java | 10 + .../datasource/BoneCPDataSourceProvider.java | 95 - .../datasource/DataSourceProvider.java | 79 - .../datasource/DataSourceProviderFactory.java | 47 - .../datasource/HikariCPDataSourceProvider.java | 97 - .../hive/metastore/datasource/package-info.java | 23 - .../hive/metastore/tools/SQLGenerator.java | 172 - .../hive/metastore/txn/CompactionInfo.java | 151 - .../metastore/txn/CompactionTxnHandler.java | 960 ----- .../hadoop/hive/metastore/txn/TxnDbUtil.java | 369 -- .../hadoop/hive/metastore/txn/TxnHandler.java | 3667 ------------------ .../hadoop/hive/metastore/txn/TxnStore.java | 433 --- .../hadoop/hive/metastore/txn/TxnUtils.java | 242 -- .../TestDataSourceProviderFactory.java | 173 - .../metastore/txn/TestTxnHandlerNegative.java | 54 - .../hadoop/hive/metastore/txn/TestTxnUtils.java | 215 - .../txn/TestValidCompactorTxnList.java | 136 - .../hive/ql/txn/AcidOpenTxnsCounterService.java | 75 - .../metastore/txn/TestCompactionTxnHandler.java | 4 +- .../hive/metastore/txn/TestTxnHandler.java | 4 +- .../txn/TestTxnHandlerNoConnectionPool.java | 6 +- .../apache/hadoop/hive/ql/TestTxnCommands.java | 23 +- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 16 +- .../hadoop/hive/ql/TxnCommandsBaseForTests.java | 5 +- .../hive/ql/lockmgr/TestDbTxnManager.java | 28 +- .../hive/ql/lockmgr/TestDbTxnManager2.java | 356 +- .../hive/ql/txn/compactor/CompactorTest.java | 2 +- standalone-metastore/pom.xml | 20 + .../hive/metastore/RunnableConfigurable.java | 26 + .../hive/metastore/conf/MetastoreConf.java | 63 +- .../datasource/BoneCPDataSourceProvider.java | 94 + .../datasource/DataSourceProvider.java | 79 + .../datasource/DataSourceProviderFactory.java | 47 + .../datasource/HikariCPDataSourceProvider.java | 96 + .../hive/metastore/datasource/package-info.java | 23 + .../hive/metastore/tools/SQLGenerator.java | 172 + .../txn/AcidOpenTxnsCounterService.java | 63 + .../hive/metastore/txn/CompactionInfo.java | 151 + .../metastore/txn/CompactionTxnHandler.java | 961 +++++ .../hadoop/hive/metastore/txn/TxnDbUtil.java | 381 ++ .../hadoop/hive/metastore/txn/TxnHandler.java | 3643 +++++++++++++++++ .../hadoop/hive/metastore/txn/TxnStore.java | 420 ++ .../hadoop/hive/metastore/txn/TxnUtils.java | 243 ++ .../hadoop/hive/metastore/utils/JavaUtils.java | 46 + .../hive/metastore/utils/StringableMap.java | 80 + .../hive/metastore/conf/TestMetastoreConf.java | 17 + .../TestDataSourceProviderFactory.java | 170 + .../metastore/txn/TestTxnHandlerNegative.java | 55 + .../hadoop/hive/metastore/txn/TestTxnUtils.java | 216 ++ storage-api/pom.xml | 7 + .../hive/common/ValidCompactorTxnList.java | 89 + .../hadoop/hive/common/ValidReadTxnList.java | 235 ++ .../apache/hadoop/hive/common/ValidTxnList.java | 112 + .../hive/common/TestValidCompactorTxnList.java | 134 + .../hive/common/TestValidReadTxnList.java | 109 + 64 files changed, 7979 insertions(+), 7783 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java deleted file mode 100644 index eaa0b34..0000000 --- a/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.common; - -import java.util.Arrays; -import java.util.BitSet; - -/** - * An implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor. - * - * Compaction should only include txns up to smallest open txn (exclussive). - * There may be aborted txns in the snapshot represented by this ValidCompactorTxnList. - * Thus {@link #isTxnRangeValid(long, long)} returns NONE for any range that inluces any unresolved - * transactions. Any txn above {@code highWatermark} is unresolved. - * These produce the logic we need to assure that the compactor only sees records less than the lowest - * open transaction when choosing which files to compact, but that it still ignores aborted - * records when compacting. - * - * See org.apache.hadoop.hive.metastore.txn.TxnUtils#createValidCompactTxnList() for proper - * way to construct this. - */ -public class ValidCompactorTxnList extends ValidReadTxnList { - public ValidCompactorTxnList() { - super(); - } - public ValidCompactorTxnList(long[] abortedTxnList, BitSet abortedBits, long highWatermark) { - this(abortedTxnList, abortedBits, highWatermark, Long.MAX_VALUE); - } - /** - * @param abortedTxnList list of all aborted transactions - * @param abortedBits bitset marking whether the corresponding transaction is aborted - * @param highWatermark highest committed transaction to be considered for compaction, - * equivalently (lowest_open_txn - 1). - */ - public ValidCompactorTxnList(long[] abortedTxnList, BitSet abortedBits, long highWatermark, long minOpenTxnId) { - // abortedBits should be all true as everything in exceptions are aborted txns - super(abortedTxnList, abortedBits, highWatermark, minOpenTxnId); - if(this.exceptions.length <= 0) { - return; - } - //now that exceptions (aka abortedTxnList) is sorted - int idx = Arrays.binarySearch(this.exceptions, highWatermark); - int lastElementPos; - if(idx < 0) { - int insertionPoint = -idx - 1 ;//see Arrays.binarySearch() JavaDoc - lastElementPos = insertionPoint - 1; - } - else { - lastElementPos = idx; - } - /** - * ensure that we throw out any exceptions above highWatermark to make - * {@link #isTxnValid(long)} faster - */ - this.exceptions = Arrays.copyOf(this.exceptions, lastElementPos + 1); - } - public ValidCompactorTxnList(String value) { - super(value); - } - /** - * Returns org.apache.hadoop.hive.common.ValidTxnList.RangeResponse.ALL if all txns in - * the range are resolved and RangeResponse.NONE otherwise - */ - @Override - public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) { - return highWatermark >= maxTxnId ? RangeResponse.ALL : RangeResponse.NONE; - } - - @Override - public boolean isTxnAborted(long txnid) { - return Arrays.binarySearch(exceptions, txnid) >= 0; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java deleted file mode 100644 index 002afd6..0000000 --- a/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java +++ /dev/null @@ -1,237 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.common; - -import com.google.common.annotations.VisibleForTesting; - -import java.util.Arrays; -import java.util.BitSet; - -/** - * An implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by readers. - * This class will view a transaction as valid only if it is committed. Both open and aborted - * transactions will be seen as invalid. - */ -public class ValidReadTxnList implements ValidTxnList { - - protected long[] exceptions; - protected BitSet abortedBits; // BitSet for flagging aborted transactions. Bit is true if aborted, false if open - //default value means there are no open txn in the snapshot - private long minOpenTxn = Long.MAX_VALUE; - protected long highWatermark; - - public ValidReadTxnList() { - this(new long[0], new BitSet(), Long.MAX_VALUE, Long.MAX_VALUE); - } - - /** - * Used if there are no open transactions in the snapshot - */ - public ValidReadTxnList(long[] exceptions, BitSet abortedBits, long highWatermark) { - this(exceptions, abortedBits, highWatermark, Long.MAX_VALUE); - } - public ValidReadTxnList(long[] exceptions, BitSet abortedBits, long highWatermark, long minOpenTxn) { - if (exceptions.length > 0) { - this.minOpenTxn = minOpenTxn; - } - this.exceptions = exceptions; - this.abortedBits = abortedBits; - this.highWatermark = highWatermark; - } - - public ValidReadTxnList(String value) { - readFromString(value); - } - - @Override - public boolean isTxnValid(long txnid) { - if (highWatermark < txnid) { - return false; - } - return Arrays.binarySearch(exceptions, txnid) < 0; - } - - /** - * We cannot use a base file if its range contains an open txn. - * @param txnid from base_xxxx - */ - @Override - public boolean isValidBase(long txnid) { - return minOpenTxn > txnid && txnid <= highWatermark; - } - @Override - public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) { - // check the easy cases first - if (highWatermark < minTxnId) { - return RangeResponse.NONE; - } else if (exceptions.length > 0 && exceptions[0] > maxTxnId) { - return RangeResponse.ALL; - } - - // since the exceptions and the range in question overlap, count the - // exceptions in the range - long count = Math.max(0, maxTxnId - highWatermark); - for(long txn: exceptions) { - if (minTxnId <= txn && txn <= maxTxnId) { - count += 1; - } - } - - if (count == 0) { - return RangeResponse.ALL; - } else if (count == (maxTxnId - minTxnId + 1)) { - return RangeResponse.NONE; - } else { - return RangeResponse.SOME; - } - } - - @Override - public String toString() { - return writeToString(); - } - - @Override - public String writeToString() { - StringBuilder buf = new StringBuilder(); - buf.append(highWatermark); - buf.append(':'); - buf.append(minOpenTxn); - if (exceptions.length == 0) { - buf.append(':'); // separator for open txns - buf.append(':'); // separator for aborted txns - } else { - StringBuilder open = new StringBuilder(); - StringBuilder abort = new StringBuilder(); - for (int i = 0; i < exceptions.length; i++) { - if (abortedBits.get(i)) { - if (abort.length() > 0) { - abort.append(','); - } - abort.append(exceptions[i]); - } else { - if (open.length() > 0) { - open.append(','); - } - open.append(exceptions[i]); - } - } - buf.append(':'); - buf.append(open); - buf.append(':'); - buf.append(abort); - } - return buf.toString(); - } - - @Override - public void readFromString(String src) { - if (src == null || src.length() == 0) { - highWatermark = Long.MAX_VALUE; - exceptions = new long[0]; - abortedBits = new BitSet(); - } else { - String[] values = src.split(":"); - highWatermark = Long.parseLong(values[0]); - minOpenTxn = Long.parseLong(values[1]); - String[] openTxns = new String[0]; - String[] abortedTxns = new String[0]; - if (values.length < 3) { - openTxns = new String[0]; - abortedTxns = new String[0]; - } else if (values.length == 3) { - if (!values[2].isEmpty()) { - openTxns = values[2].split(","); - } - } else { - if (!values[2].isEmpty()) { - openTxns = values[2].split(","); - } - if (!values[3].isEmpty()) { - abortedTxns = values[3].split(","); - } - } - exceptions = new long[openTxns.length + abortedTxns.length]; - int i = 0; - for (String open : openTxns) { - exceptions[i++] = Long.parseLong(open); - } - for (String abort : abortedTxns) { - exceptions[i++] = Long.parseLong(abort); - } - Arrays.sort(exceptions); - abortedBits = new BitSet(exceptions.length); - for (String abort : abortedTxns) { - int index = Arrays.binarySearch(exceptions, Long.parseLong(abort)); - abortedBits.set(index); - } - } - } - - @Override - public long getHighWatermark() { - return highWatermark; - } - - @Override - public long[] getInvalidTransactions() { - return exceptions; - } - - @Override - public Long getMinOpenTxn() { - return minOpenTxn == Long.MAX_VALUE ? null : minOpenTxn; - } - - @Override - public boolean isTxnAborted(long txnid) { - int index = Arrays.binarySearch(exceptions, txnid); - return index >= 0 && abortedBits.get(index); - } - - @Override - public RangeResponse isTxnRangeAborted(long minTxnId, long maxTxnId) { - // check the easy cases first - if (highWatermark < minTxnId) { - return RangeResponse.NONE; - } - - int count = 0; // number of aborted txns found in exceptions - - // traverse the aborted txns list, starting at first aborted txn index - for (int i = abortedBits.nextSetBit(0); i >= 0; i = abortedBits.nextSetBit(i + 1)) { - long abortedTxnId = exceptions[i]; - if (abortedTxnId > maxTxnId) { // we've already gone beyond the specified range - break; - } - if (abortedTxnId >= minTxnId && abortedTxnId <= maxTxnId) { - count++; - } - } - - if (count == 0) { - return RangeResponse.NONE; - } else if (count == (maxTxnId - minTxnId + 1)) { - return RangeResponse.ALL; - } else { - return RangeResponse.SOME; - } - } -} - http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java deleted file mode 100644 index 108e5ca..0000000 --- a/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.common; - -/** - * Models the list of transactions that should be included in a snapshot. - * It is modelled as a high water mark, which is the largest transaction id that - * has been committed and a list of transactions that are not included. - */ -public interface ValidTxnList { - - /** - * Key used to store valid txn list in a - * {@link org.apache.hadoop.conf.Configuration} object. - */ - public static final String VALID_TXNS_KEY = "hive.txn.valid.txns"; - - /** - * The response to a range query. NONE means no values in this range match, - * SOME mean that some do, and ALL means that every value does. - */ - public enum RangeResponse {NONE, SOME, ALL}; - - /** - * Indicates whether a given transaction is valid. Note that valid may have different meanings - * for different implementations, as some will only want to see committed transactions and some - * both committed and aborted. - * @param txnid id for the transaction - * @return true if valid, false otherwise - */ - public boolean isTxnValid(long txnid); - - /** - * Returns {@code true} if such base file can be used to materialize the snapshot represented by - * this {@code ValidTxnList}. - * @param txnid highest txn in a given base_xxxx file - */ - public boolean isValidBase(long txnid); - - /** - * Find out if a range of transaction ids are valid. Note that valid may have different meanings - * for different implementations, as some will only want to see committed transactions and some - * both committed and aborted. - * @param minTxnId minimum txnid to look for, inclusive - * @param maxTxnId maximum txnid to look for, inclusive - * @return Indicate whether none, some, or all of these transactions are valid. - */ - public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId); - - /** - * Write this validTxnList into a string. This should produce a string that - * can be used by {@link #readFromString(String)} to populate a validTxnsList. - */ - public String writeToString(); - - /** - * Populate this validTxnList from the string. It is assumed that the string - * was created via {@link #writeToString()} and the exceptions list is sorted. - * @param src source string. - */ - public void readFromString(String src); - - /** - * Get the largest transaction id used. - * @return largest transaction id used - */ - public long getHighWatermark(); - - /** - * Get the list of transactions under the high water mark that are not valid. Note that invalid - * may have different meanings for different implementations, as some will only want to see open - * transactions and some both open and aborted. - * @return a list of invalid transaction ids - */ - public long[] getInvalidTransactions(); - - /** - * Indicates whether a given transaction is aborted. - * @param txnid id for the transaction - * @return true if aborted, false otherwise - */ - public boolean isTxnAborted(long txnid); - - /** - * Find out if a range of transaction ids are aborted. - * @param minTxnId minimum txnid to look for, inclusive - * @param maxTxnId maximum txnid to look for, inclusive - * @return Indicate whether none, some, or all of these transactions are aborted. - */ - public RangeResponse isTxnRangeAborted(long minTxnId, long maxTxnId); - - /** - * Returns smallest Open transaction in this set, {@code null} if there is none. - */ - Long getMinOpenTxn(); -} http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java ---------------------------------------------------------------------- diff --git a/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java b/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java deleted file mode 100644 index 00ee820..0000000 --- a/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.common; - -import junit.framework.Assert; -import org.apache.hadoop.conf.Configuration; -import org.junit.Test; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.util.BitSet; - -/** - * Tests for {@link ValidReadTxnList} - */ -public class TestValidReadTxnList { - - @Test - public void noExceptions() throws Exception { - ValidTxnList txnList = new ValidReadTxnList(new long[0], new BitSet(), 1, Long.MAX_VALUE); - String str = txnList.writeToString(); - Assert.assertEquals("1:" + Long.MAX_VALUE + "::", str); - ValidTxnList newList = new ValidReadTxnList(); - newList.readFromString(str); - Assert.assertTrue(newList.isTxnValid(1)); - Assert.assertFalse(newList.isTxnValid(2)); - } - - @Test - public void exceptions() throws Exception { - ValidTxnList txnList = new ValidReadTxnList(new long[]{2L,4L}, new BitSet(), 5, 4L); - String str = txnList.writeToString(); - Assert.assertEquals("5:4:2,4:", str); - ValidTxnList newList = new ValidReadTxnList(); - newList.readFromString(str); - Assert.assertTrue(newList.isTxnValid(1)); - Assert.assertFalse(newList.isTxnValid(2)); - Assert.assertTrue(newList.isTxnValid(3)); - Assert.assertFalse(newList.isTxnValid(4)); - Assert.assertTrue(newList.isTxnValid(5)); - Assert.assertFalse(newList.isTxnValid(6)); - } - - @Test - public void longEnoughToCompress() throws Exception { - long[] exceptions = new long[1000]; - for (int i = 0; i < 1000; i++) exceptions[i] = i + 100; - ValidTxnList txnList = new ValidReadTxnList(exceptions, new BitSet(), 2000, 900); - String str = txnList.writeToString(); - ValidTxnList newList = new ValidReadTxnList(); - newList.readFromString(str); - for (int i = 0; i < 100; i++) Assert.assertTrue(newList.isTxnValid(i)); - for (int i = 100; i < 1100; i++) Assert.assertFalse(newList.isTxnValid(i)); - for (int i = 1100; i < 2001; i++) Assert.assertTrue(newList.isTxnValid(i)); - Assert.assertFalse(newList.isTxnValid(2001)); - } - - @Test - public void readWriteConfig() throws Exception { - long[] exceptions = new long[1000]; - for (int i = 0; i < 1000; i++) exceptions[i] = i + 100; - ValidTxnList txnList = new ValidReadTxnList(exceptions, new BitSet(), 2000, 900); - String str = txnList.writeToString(); - Configuration conf = new Configuration(); - conf.set(ValidTxnList.VALID_TXNS_KEY, str); - File tmpFile = File.createTempFile("TestValidTxnImpl", "readWriteConfig"); - DataOutputStream out = new DataOutputStream(new FileOutputStream(tmpFile)); - conf.write(out); - out.close(); - DataInputStream in = new DataInputStream(new FileInputStream(tmpFile)); - Configuration newConf = new Configuration(); - newConf.readFields(in); - Assert.assertEquals(str, newConf.get(ValidTxnList.VALID_TXNS_KEY)); - } - - @Test - public void testAbortedTxn() throws Exception { - long[] exceptions = {2L, 4L, 6L, 8L, 10L}; - BitSet bitSet = new BitSet(exceptions.length); - bitSet.set(0); // mark txn "2L" aborted - bitSet.set(3); // mark txn "8L" aborted - ValidTxnList txnList = new ValidReadTxnList(exceptions, bitSet, 11, 4L); - String str = txnList.writeToString(); - Assert.assertEquals("11:4:4,6,10:2,8", str); - Assert.assertTrue(txnList.isTxnAborted(2L)); - Assert.assertFalse(txnList.isTxnAborted(4L)); - Assert.assertFalse(txnList.isTxnAborted(6L)); - Assert.assertTrue(txnList.isTxnAborted(8L)); - Assert.assertFalse(txnList.isTxnAborted(10L)); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 921bbd3..3b5066f 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -211,8 +211,8 @@ public class TestStreaming { //1) Start from a clean slate (metastore) - TxnDbUtil.cleanDb(); - TxnDbUtil.prepDb(); + TxnDbUtil.cleanDb(conf); + TxnDbUtil.prepDb(conf); //2) obtain metastore clients msClient = new HiveMetaStoreClient(conf); http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingTestUtils.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingTestUtils.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingTestUtils.java index 87a72b5..63690f9 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingTestUtils.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingTestUtils.java @@ -66,8 +66,8 @@ public class StreamingTestUtils { public void prepareTransactionDatabase(HiveConf conf) throws Exception { TxnDbUtil.setConfValues(conf); - TxnDbUtil.cleanDb(); - TxnDbUtil.prepDb(); + TxnDbUtil.cleanDb(conf); + TxnDbUtil.prepDb(conf); } public IMetaStoreClient newMetaStoreClient(HiveConf conf) throws Exception { http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java index 1002be7..a19cc86 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java @@ -259,12 +259,12 @@ public class TestHiveMetaStoreTxns { @Before public void setUp() throws Exception { - TxnDbUtil.prepDb(); + TxnDbUtil.prepDb(conf); client = new HiveMetaStoreClient(conf); } @After public void tearDown() throws Exception { - TxnDbUtil.cleanDb(); + TxnDbUtil.cleanDb(conf); } } http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java index ea0aadf..dbfc235 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java @@ -96,7 +96,6 @@ public class TestAcidOnTez { @Before public void setUp() throws Exception { - tearDown(); hiveConf = new HiveConf(this.getClass()); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); @@ -107,7 +106,7 @@ public class TestAcidOnTez { .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); TxnDbUtil.setConfValues(hiveConf); - TxnDbUtil.prepDb(); + TxnDbUtil.prepDb(hiveConf); File f = new File(TEST_WAREHOUSE_DIR); if (f.exists()) { FileUtil.fullyDelete(f); @@ -152,7 +151,7 @@ public class TestAcidOnTez { d.close(); d = null; } - TxnDbUtil.cleanDb(); + TxnDbUtil.cleanDb(hiveConf); } finally { FileUtils.deleteDirectory(new File(TEST_DATA_DIR)); } http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index aea1dfc..707bcd1 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -118,8 +118,8 @@ public class TestCompactor { //"org.apache.hadoop.hive.ql.io.HiveInputFormat" TxnDbUtil.setConfValues(hiveConf); - TxnDbUtil.cleanDb(); - TxnDbUtil.prepDb(); + TxnDbUtil.cleanDb(hiveConf); + TxnDbUtil.prepDb(hiveConf); conf = hiveConf; msClient = new HiveMetaStoreClient(conf); http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index f4af6f4..5617e1c 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -131,6 +131,7 @@ import org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask; import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager; import org.apache.hadoop.hive.metastore.security.TUGIContainingTransport; +import org.apache.hadoop.hive.metastore.txn.AcidOpenTxnsCounterService; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.serde2.Deserializer; @@ -7624,6 +7625,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { + e.getMessage(), e); } } + ThreadPool.shutdown(); } }); @@ -8002,6 +8004,14 @@ public class HiveMetaStore extends ThriftHiveMetastore { startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService")); startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService")); startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidWriteSetService")); + + ThreadPool.initialize(conf); + RunnableConfigurable rc = new AcidOpenTxnsCounterService(); + rc.setConf(conf); + ThreadPool.getPool().scheduleAtFixedRate(rc, 100, MetastoreConf.getTimeVar(conf, + MetastoreConf.ConfVars.COUNT_OPEN_TXNS_INTERVAL, TimeUnit.MILLISECONDS), + TimeUnit.MILLISECONDS); + } private static void startHouseKeeperService(HiveConf conf, Class c) throws Exception { //todo: when metastore adds orderly-shutdown logic, houseKeeper.stop() http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/metastore/src/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java b/metastore/src/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java deleted file mode 100644 index 34765b0..0000000 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore.datasource; - -import com.jolbox.bonecp.BoneCPConfig; -import com.jolbox.bonecp.BoneCPDataSource; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.sql.DataSource; -import java.sql.SQLException; -import java.util.Properties; - -/** - * DataSourceProvider for the BoneCP connection pool. - */ -public class BoneCPDataSourceProvider implements DataSourceProvider { - - private static final Logger LOG = LoggerFactory.getLogger(BoneCPDataSourceProvider.class); - - public static final String BONECP = "bonecp"; - private static final String CONNECTION_TIMEOUT_PROPERTY= "bonecp.connectionTimeoutInMs"; - private static final String PARTITION_COUNT_PROPERTY= "bonecp.partitionCount"; - - @Override - public DataSource create(Configuration hdpConfig) throws SQLException { - - LOG.debug("Creating BoneCP connection pool for the MetaStore"); - - String driverUrl = DataSourceProvider.getMetastoreJdbcDriverUrl(hdpConfig); - String user = DataSourceProvider.getMetastoreJdbcUser(hdpConfig); - String passwd = DataSourceProvider.getMetastoreJdbcPasswd(hdpConfig); - int maxPoolSize = hdpConfig.getInt( - MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS.varname, - ((Long)MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS.defaultVal).intValue()); - - Properties properties = DataSourceProvider.getPrefixedProperties(hdpConfig, BONECP); - long connectionTimeout = hdpConfig.getLong(CONNECTION_TIMEOUT_PROPERTY, 30000L); - String partitionCount = properties.getProperty(PARTITION_COUNT_PROPERTY, "1"); - - BoneCPConfig config = null; - try { - config = new BoneCPConfig(properties); - } catch (Exception e) { - throw new SQLException("Cannot create BoneCP configuration: ", e); - } - config.setJdbcUrl(driverUrl); - //if we are waiting for connection for a long time, something is really wrong - //better raise an error than hang forever - //see DefaultConnectionStrategy.getConnectionInternal() - config.setConnectionTimeoutInMs(connectionTimeout); - config.setMaxConnectionsPerPartition(maxPoolSize); - config.setPartitionCount(Integer.parseInt(partitionCount)); - config.setUser(user); - config.setPassword(passwd); - return new BoneCPDataSource(config); - } - - @Override - public boolean mayReturnClosedConnection() { - // See HIVE-11915 for details - return true; - } - - @Override - public boolean supports(Configuration configuration) { - String poolingType = - configuration.get( - MetastoreConf.ConfVars.CONNECTION_POOLING_TYPE.varname).toLowerCase(); - if (BONECP.equals(poolingType)) { - int boneCpPropsNr = DataSourceProvider.getPrefixedProperties(configuration, BONECP).size(); - LOG.debug("Found " + boneCpPropsNr + " nr. of bonecp specific configurations"); - return boneCpPropsNr > 0; - } - LOG.debug("Configuration requested " + poolingType + " pooling, BoneCpDSProvider exiting"); - return false; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/metastore/src/java/org/apache/hadoop/hive/metastore/datasource/DataSourceProvider.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/datasource/DataSourceProvider.java b/metastore/src/java/org/apache/hadoop/hive/metastore/datasource/DataSourceProvider.java deleted file mode 100644 index ad1763e..0000000 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/datasource/DataSourceProvider.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore.datasource; - -import com.google.common.collect.Iterables; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; - -import javax.sql.DataSource; -import java.io.IOException; -import java.sql.SQLException; -import java.util.Properties; - -public interface DataSourceProvider { - - /** - * @param hdpConfig - * @return the new connection pool - */ - DataSource create(Configuration hdpConfig) throws SQLException; - - /** - * BoneCp has a bug which causes closed connections to be returned to the pool - * under certain conditions. (HIVE-11915) - * @return true if the factory creates BoneCp pools which need "special attention" - */ - boolean mayReturnClosedConnection(); - - /** - * @param configuration Hadoop configuration object - * @return factory able to create a connection pool for the implementation - * specified in the configuration - */ - boolean supports(Configuration configuration); - - /** - * @param hdpConfig - * @return subset of properties prefixed by a connection pool specific substring - */ - static Properties getPrefixedProperties(Configuration hdpConfig, String factoryPrefix) { - Properties dataSourceProps = new Properties(); - Iterables.filter( - hdpConfig, (entry -> entry.getKey() != null && entry.getKey().startsWith(factoryPrefix))) - .forEach(entry -> dataSourceProps.put(entry.getKey(), entry.getValue())); - return dataSourceProps; - } - - static String getMetastoreJdbcUser(Configuration conf) { - return conf.get(MetastoreConf.ConfVars.CONNECTION_USER_NAME.varname); - } - - static String getMetastoreJdbcPasswd(Configuration conf) throws SQLException { - try { - return MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.PWD); - } catch (IOException err) { - throw new SQLException("Error getting metastore password", err); - } - } - - static String getMetastoreJdbcDriverUrl(Configuration conf) throws SQLException { - return conf.get(MetastoreConf.ConfVars.CONNECTURLKEY.varname); - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/metastore/src/java/org/apache/hadoop/hive/metastore/datasource/DataSourceProviderFactory.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/datasource/DataSourceProviderFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/datasource/DataSourceProviderFactory.java deleted file mode 100644 index 1eb792c..0000000 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/datasource/DataSourceProviderFactory.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore.datasource; - -import com.google.common.collect.ImmutableList; -import org.apache.hadoop.conf.Configuration; - -/** - * Create a DataSourceProvider for a connectionPool configured in a hadoop - * Configuration object. - */ -public abstract class DataSourceProviderFactory { - - private static final ImmutableList<DataSourceProvider> FACTORIES = - ImmutableList.<DataSourceProvider>builder().add(new HikariCPDataSourceProvider(), new BoneCPDataSourceProvider()).build(); - - /** - * @param hdpConfig hadoop configuration - * @return factory for the configured datanucleus.connectionPoolingType - */ - public static DataSourceProvider getDataSourceProvider(Configuration hdpConfig) { - - for (DataSourceProvider factory : FACTORIES) { - - if (factory.supports(hdpConfig)) { - return factory; - } - } - return null; - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/metastore/src/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java b/metastore/src/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java deleted file mode 100644 index 9b3d6d5..0000000 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore.datasource; - -import com.zaxxer.hikari.HikariConfig; -import com.zaxxer.hikari.HikariDataSource; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.sql.DataSource; -import java.sql.SQLException; -import java.util.Properties; - -/** - * DataSourceProvider for the HikariCP connection pool. - */ -public class HikariCPDataSourceProvider implements DataSourceProvider { - - private static final Logger LOG = LoggerFactory.getLogger(HikariCPDataSourceProvider.class); - - public static final String HIKARI = "hikari"; - private static final String CONNECTION_TIMEOUT_PROPERTY= "hikari.connectionTimeout"; - - @Override - public DataSource create(Configuration hdpConfig) throws SQLException { - - LOG.debug("Creating Hikari connection pool for the MetaStore"); - - String driverUrl = DataSourceProvider.getMetastoreJdbcDriverUrl(hdpConfig); - String user = DataSourceProvider.getMetastoreJdbcUser(hdpConfig); - String passwd = DataSourceProvider.getMetastoreJdbcPasswd(hdpConfig); - int maxPoolSize = hdpConfig.getInt( - MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS.varname, - ((Long)MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS.defaultVal).intValue()); - - Properties properties = replacePrefix( - DataSourceProvider.getPrefixedProperties(hdpConfig, HIKARI)); - long connectionTimeout = hdpConfig.getLong(CONNECTION_TIMEOUT_PROPERTY, 30000L); - HikariConfig config = null; - try { - config = new HikariConfig(properties); - } catch (Exception e) { - throw new SQLException("Cannot create HikariCP configuration: ", e); - } - config.setMaximumPoolSize(maxPoolSize); - config.setJdbcUrl(driverUrl); - config.setUsername(user); - config.setPassword(passwd); - //https://github.com/brettwooldridge/HikariCP - config.setConnectionTimeout(connectionTimeout); - return new HikariDataSource(config); - } - - @Override - public boolean mayReturnClosedConnection() { - // Only BoneCP should return true - return false; - } - - @Override - public boolean supports(Configuration configuration) { - String poolingType = - configuration.get( - MetastoreConf.ConfVars.CONNECTION_POOLING_TYPE.varname).toLowerCase(); - if (HIKARI.equals(poolingType)) { - int hikariPropsNr = DataSourceProvider.getPrefixedProperties(configuration, HIKARI).size(); - LOG.debug("Found " + hikariPropsNr + " nr. of hikari specific configurations"); - return hikariPropsNr > 0; - } - LOG.debug("Configuration requested " + poolingType + " pooling, HikariCpDSProvider exiting"); - return false; - } - - private Properties replacePrefix(Properties props) { - Properties newProps = new Properties(); - props.forEach((key,value) -> - newProps.put(key.toString().replaceFirst(HIKARI + ".", ""), value)); - return newProps; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/metastore/src/java/org/apache/hadoop/hive/metastore/datasource/package-info.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/datasource/package-info.java b/metastore/src/java/org/apache/hadoop/hive/metastore/datasource/package-info.java deleted file mode 100644 index 86d6a26..0000000 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/datasource/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * DataSource providers that can construct a connection pool from configuration - * properties in a hadoop configuration object. - */ -package org.apache.hadoop.hive.metastore.datasource; http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/metastore/src/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java deleted file mode 100644 index 0c0bfef..0000000 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ -package org.apache.hadoop.hive.metastore.tools; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.DatabaseProduct; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** - * Helper class that generates SQL queries with syntax specific to target DB - * todo: why throw MetaException? - */ -@VisibleForTesting -public final class SQLGenerator { - static final private Logger LOG = LoggerFactory.getLogger(SQLGenerator.class.getName()); - private final DatabaseProduct dbProduct; - private final HiveConf conf; - - public SQLGenerator(DatabaseProduct dbProduct, HiveConf conf) { - this.dbProduct = dbProduct; - this.conf = conf; - } - - /** - * Genereates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB - * - * @param tblColumns e.g. "T(a,b,c)" - * @param rows e.g. list of Strings like 3,4,'d' - * @return fully formed INSERT INTO ... statements - */ - public List<String> createInsertValuesStmt(String tblColumns, List<String> rows) { - if (rows == null || rows.size() == 0) { - return Collections.emptyList(); - } - List<String> insertStmts = new ArrayList<>(); - StringBuilder sb = new StringBuilder(); - switch (dbProduct) { - case ORACLE: - if (rows.size() > 1) { - //http://www.oratable.com/oracle-insert-all/ - //https://livesql.oracle.com/apex/livesql/file/content_BM1LJQ87M5CNIOKPOWPV6ZGR3.html - for (int numRows = 0; numRows < rows.size(); numRows++) { - if (numRows % conf - .getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE) == 0) { - if (numRows > 0) { - sb.append(" select * from dual"); - insertStmts.add(sb.toString()); - } - sb.setLength(0); - sb.append("insert all "); - } - sb.append("into ").append(tblColumns).append(" values(").append(rows.get(numRows)) - .append(") "); - } - sb.append("select * from dual"); - insertStmts.add(sb.toString()); - return insertStmts; - } - //fall through - case DERBY: - case MYSQL: - case POSTGRES: - case SQLSERVER: - for (int numRows = 0; numRows < rows.size(); numRows++) { - if (numRows % conf - .getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE) == 0) { - if (numRows > 0) { - insertStmts.add(sb.substring(0, sb.length() - 1));//exclude trailing comma - } - sb.setLength(0); - sb.append("insert into ").append(tblColumns).append(" values"); - } - sb.append('(').append(rows.get(numRows)).append("),"); - } - insertStmts.add(sb.substring(0, sb.length() - 1));//exclude trailing comma - return insertStmts; - default: - String msg = "Unrecognized database product name <" + dbProduct + ">"; - LOG.error(msg); - throw new IllegalStateException(msg); - } - } - - /** - * Given a {@code selectStatement}, decorated it with FOR UPDATE or semantically equivalent - * construct. If the DB doesn't support, return original select. - */ - public String addForUpdateClause(String selectStatement) throws MetaException { - switch (dbProduct) { - case DERBY: - //https://db.apache.org/derby/docs/10.1/ref/rrefsqlj31783.html - //sadly in Derby, FOR UPDATE doesn't meant what it should - return selectStatement; - case MYSQL: - //http://dev.mysql.com/doc/refman/5.7/en/select.html - case ORACLE: - //https://docs.oracle.com/cd/E17952_01/refman-5.6-en/select.html - case POSTGRES: - //http://www.postgresql.org/docs/9.0/static/sql-select.html - return selectStatement + " for update"; - case SQLSERVER: - //https://msdn.microsoft.com/en-us/library/ms189499.aspx - //https://msdn.microsoft.com/en-us/library/ms187373.aspx - String modifier = " with (updlock)"; - int wherePos = selectStatement.toUpperCase().indexOf(" WHERE "); - if (wherePos < 0) { - return selectStatement + modifier; - } - return selectStatement.substring(0, wherePos) + modifier + - selectStatement.substring(wherePos, selectStatement.length()); - default: - String msg = "Unrecognized database product name <" + dbProduct + ">"; - LOG.error(msg); - throw new MetaException(msg); - } - } - - /** - * Suppose you have a query "select a,b from T" and you want to limit the result set - * to the first 5 rows. The mechanism to do that differs in different DBs. - * Make {@code noSelectsqlQuery} to be "a,b from T" and this method will return the - * appropriately modified row limiting query. - * <p> - * Note that if {@code noSelectsqlQuery} contains a join, you must make sure that - * all columns are unique for Oracle. - */ - public String addLimitClause(int numRows, String noSelectsqlQuery) throws MetaException { - switch (dbProduct) { - case DERBY: - //http://db.apache.org/derby/docs/10.7/ref/rrefsqljoffsetfetch.html - return "select " + noSelectsqlQuery + " fetch first " + numRows + " rows only"; - case MYSQL: - //http://www.postgresql.org/docs/7.3/static/queries-limit.html - case POSTGRES: - //https://dev.mysql.com/doc/refman/5.0/en/select.html - return "select " + noSelectsqlQuery + " limit " + numRows; - case ORACLE: - //newer versions (12c and later) support OFFSET/FETCH - return "select * from (select " + noSelectsqlQuery + ") where rownum <= " + numRows; - case SQLSERVER: - //newer versions (2012 and later) support OFFSET/FETCH - //https://msdn.microsoft.com/en-us/library/ms189463.aspx - return "select TOP(" + numRows + ") " + noSelectsqlQuery; - default: - String msg = "Unrecognized database product name <" + dbProduct + ">"; - LOG.error(msg); - throw new MetaException(msg); - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java deleted file mode 100644 index 413ce3b..0000000 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore.txn; - -import org.apache.hadoop.hive.common.ValidCompactorTxnList; -import org.apache.hadoop.hive.metastore.api.CompactionType; -import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; - -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; - -/** - * Information on a possible or running compaction. - */ -public class CompactionInfo implements Comparable<CompactionInfo> { - public long id; - public String dbname; - public String tableName; - public String partName; - char state; - public CompactionType type; - String workerId; - long start; - public String runAs; - public String properties; - public boolean tooManyAborts = false; - /** - * {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will return 0 if field is NULL) - * See {@link TxnStore#setCompactionHighestTxnId(CompactionInfo, long)} for precise definition. - * See also {@link TxnUtils#createValidCompactTxnList(GetOpenTxnsInfoResponse)} and - * {@link ValidCompactorTxnList#highWatermark} - */ - public long highestTxnId; - byte[] metaInfo; - String hadoopJobId; - - private String fullPartitionName = null; - private String fullTableName = null; - - public CompactionInfo(String dbname, String tableName, String partName, CompactionType type) { - this.dbname = dbname; - this.tableName = tableName; - this.partName = partName; - this.type = type; - } - CompactionInfo(long id, String dbname, String tableName, String partName, char state) { - this(dbname, tableName, partName, null); - this.id = id; - this.state = state; - } - CompactionInfo() {} - - public String getFullPartitionName() { - if (fullPartitionName == null) { - StringBuilder buf = new StringBuilder(dbname); - buf.append('.'); - buf.append(tableName); - if (partName != null) { - buf.append('.'); - buf.append(partName); - } - fullPartitionName = buf.toString(); - } - return fullPartitionName; - } - - public String getFullTableName() { - if (fullTableName == null) { - StringBuilder buf = new StringBuilder(dbname); - buf.append('.'); - buf.append(tableName); - fullTableName = buf.toString(); - } - return fullTableName; - } - public boolean isMajorCompaction() { - return CompactionType.MAJOR == type; - } - - @Override - public int compareTo(CompactionInfo o) { - return getFullPartitionName().compareTo(o.getFullPartitionName()); - } - public String toString() { - return "id:" + id + "," + - "dbname:" + dbname + "," + - "tableName:" + tableName + "," + - "partName:" + partName + "," + - "state:" + state + "," + - "type:" + type + "," + - "properties:" + properties + "," + - "runAs:" + runAs + "," + - "tooManyAborts:" + tooManyAborts + "," + - "highestTxnId:" + highestTxnId; - } - - /** - * loads object from a row in Select * from COMPACTION_QUEUE - * @param rs ResultSet after call to rs.next() - * @throws SQLException - */ - static CompactionInfo loadFullFromCompactionQueue(ResultSet rs) throws SQLException { - CompactionInfo fullCi = new CompactionInfo(); - fullCi.id = rs.getLong(1); - fullCi.dbname = rs.getString(2); - fullCi.tableName = rs.getString(3); - fullCi.partName = rs.getString(4); - fullCi.state = rs.getString(5).charAt(0);//cq_state - fullCi.type = TxnHandler.dbCompactionType2ThriftType(rs.getString(6).charAt(0)); - fullCi.properties = rs.getString(7); - fullCi.workerId = rs.getString(8); - fullCi.start = rs.getLong(9); - fullCi.runAs = rs.getString(10); - fullCi.highestTxnId = rs.getLong(11); - fullCi.metaInfo = rs.getBytes(12); - fullCi.hadoopJobId = rs.getString(13); - return fullCi; - } - static void insertIntoCompletedCompactions(PreparedStatement pStmt, CompactionInfo ci, long endTime) throws SQLException { - pStmt.setLong(1, ci.id); - pStmt.setString(2, ci.dbname); - pStmt.setString(3, ci.tableName); - pStmt.setString(4, ci.partName); - pStmt.setString(5, Character.toString(ci.state)); - pStmt.setString(6, Character.toString(TxnHandler.thriftCompactionType2DbType(ci.type))); - pStmt.setString(7, ci.properties); - pStmt.setString(8, ci.workerId); - pStmt.setLong(9, ci.start); - pStmt.setLong(10, endTime); - pStmt.setString(11, ci.runAs); - pStmt.setLong(12, ci.highestTxnId); - pStmt.setBytes(13, ci.metaInfo); - pStmt.setString(14, ci.hadoopJobId); - } -}
