Repository: hive
Updated Branches:
  refs/heads/branch-1 39decb0bf -> 293e22e0e


HIVE-13392 disable speculative execution for ACID Compactor (Eugene Koifman, 
reviewed by Wei Zheng, Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/293e22e0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/293e22e0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/293e22e0

Branch: refs/heads/branch-1
Commit: 293e22e0eed47ec3f7e0ce4d981366c59b65455c
Parents: 39decb0
Author: Eugene Koifman <ekoif...@hortonworks.com>
Authored: Mon Jun 13 11:41:30 2016 -0700
Committer: Eugene Koifman <ekoif...@hortonworks.com>
Committed: Mon Jun 13 11:41:30 2016 -0700

----------------------------------------------------------------------
 .../hive/common/ValidCompactorTxnList.java      | 111 +++++++++++++++++++
 .../hive/metastore/txn/CompactionInfo.java      |   1 +
 .../hadoop/hive/metastore/txn/TxnUtils.java     |   1 +
 .../metastore/txn/ValidCompactorTxnList.java    | 111 -------------------
 .../txn/TestValidCompactorTxnList.java          |   1 +
 .../hive/ql/txn/compactor/CompactorMR.java      |   8 +-
 .../apache/hadoop/hive/ql/io/TestAcidUtils.java |   2 +-
 7 files changed, 121 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/293e22e0/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
----------------------------------------------------------------------
diff --git 
a/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java 
b/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
new file mode 100644
index 0000000..ad79e2c
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+
+import java.util.Arrays;
+
+/**
+ * And implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} 
for use by the compactor.
+ * For the purposes of {@link #isTxnRangeValid} this class will view a 
transaction as valid if it
+ * is committed or aborted.  Additionally it will return none if there are any 
open transactions
+ * below the max transaction given, since we don't want to compact above open 
transactions.  For
+ * {@link #isTxnValid} it will still view a transaction as valid only if it is 
committed.  These
+ * produce the logic we need to assure that the compactor only sees records 
less than the lowest
+ * open transaction when choosing which files to compact, but that it still 
ignores aborted
+ * records when compacting.
+ */
+public class ValidCompactorTxnList extends ValidReadTxnList {
+  //TODO: refactor this - minOpenTxn is not needed if we set
+  // highWatermark = Math.min(highWaterMark, minOpenTxn) (assuming there are 
open txns)
+
+  // The minimum open transaction id
+  private long minOpenTxn;
+
+  public ValidCompactorTxnList() {
+    super();
+    minOpenTxn = -1;
+  }
+
+  /**
+   *
+   * @param exceptions list of all open and aborted transactions
+   * @param minOpen lowest open transaction
+   * @param highWatermark highest committed transaction
+   */
+  public ValidCompactorTxnList(long[] exceptions, long minOpen, long 
highWatermark) {
+    super(exceptions, highWatermark);
+    minOpenTxn = minOpen;
+  }
+
+  public ValidCompactorTxnList(String value) {
+    super(value);
+  }
+
+  @Override
+  public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
+    if (highWatermark < minTxnId) {
+      return RangeResponse.NONE;
+    } else if (minOpenTxn < 0) {
+      return highWatermark >= maxTxnId ? RangeResponse.ALL : 
RangeResponse.NONE;
+    } else {
+      return minOpenTxn > maxTxnId ? RangeResponse.ALL : RangeResponse.NONE;
+    }
+  }
+
+  @Override
+  public String writeToString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append(highWatermark);
+    buf.append(':');
+    buf.append(minOpenTxn);
+    if (exceptions.length == 0) {
+      buf.append(':');
+    } else {
+      for(long except: exceptions) {
+        buf.append(':');
+        buf.append(except);
+      }
+    }
+    return buf.toString();
+  }
+
+  @Override
+  public void readFromString(String src) {
+    if (src == null || src.length() == 0) {
+      highWatermark = Long.MAX_VALUE;
+      exceptions = new long[0];
+    } else {
+      String[] values = src.split(":");
+      highWatermark = Long.parseLong(values[0]);
+      minOpenTxn = Long.parseLong(values[1]);
+      exceptions = new long[values.length - 2];
+      for(int i = 2; i < values.length; ++i) {
+        exceptions[i-2] = Long.parseLong(values[i]);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public long getMinOpenTxn() {
+    return minOpenTxn;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/293e22e0/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
index 85e0885..413ce3b 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.metastore.txn;
 
+import org.apache.hadoop.hive.common.ValidCompactorTxnList;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/293e22e0/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 5391fb0..cef84c7 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.metastore.txn;
 
+import org.apache.hadoop.hive.common.ValidCompactorTxnList;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;

http://git-wip-us.apache.org/repos/asf/hive/blob/293e22e0/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
deleted file mode 100644
index 30bdfa7..0000000
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.metastore.txn;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
-
-import java.util.Arrays;
-
-/**
- * And implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} 
for use by the compactor.
- * For the purposes of {@link #isTxnRangeValid} this class will view a 
transaction as valid if it
- * is committed or aborted.  Additionally it will return none if there are any 
open transactions
- * below the max transaction given, since we don't want to compact above open 
transactions.  For
- * {@link #isTxnValid} it will still view a transaction as valid only if it is 
committed.  These
- * produce the logic we need to assure that the compactor only sees records 
less than the lowest
- * open transaction when choosing which files to compact, but that it still 
ignores aborted
- * records when compacting.
- */
-public class ValidCompactorTxnList extends ValidReadTxnList {
-  //TODO: refactor this - minOpenTxn is not needed if we set
-  // highWatermark = Math.min(highWaterMark, minOpenTxn) (assuming there are 
open txns)
-
-  // The minimum open transaction id
-  private long minOpenTxn;
-
-  public ValidCompactorTxnList() {
-    super();
-    minOpenTxn = -1;
-  }
-
-  /**
-   *
-   * @param exceptions list of all open and aborted transactions
-   * @param minOpen lowest open transaction
-   * @param highWatermark highest committed transaction
-   */
-  public ValidCompactorTxnList(long[] exceptions, long minOpen, long 
highWatermark) {
-    super(exceptions, highWatermark);
-    minOpenTxn = minOpen;
-  }
-
-  public ValidCompactorTxnList(String value) {
-    super(value);
-  }
-
-  @Override
-  public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
-    if (highWatermark < minTxnId) {
-      return RangeResponse.NONE;
-    } else if (minOpenTxn < 0) {
-      return highWatermark >= maxTxnId ? RangeResponse.ALL : 
RangeResponse.NONE;
-    } else {
-      return minOpenTxn > maxTxnId ? RangeResponse.ALL : RangeResponse.NONE;
-    }
-  }
-
-  @Override
-  public String writeToString() {
-    StringBuilder buf = new StringBuilder();
-    buf.append(highWatermark);
-    buf.append(':');
-    buf.append(minOpenTxn);
-    if (exceptions.length == 0) {
-      buf.append(':');
-    } else {
-      for(long except: exceptions) {
-        buf.append(':');
-        buf.append(except);
-      }
-    }
-    return buf.toString();
-  }
-
-  @Override
-  public void readFromString(String src) {
-    if (src == null || src.length() == 0) {
-      highWatermark = Long.MAX_VALUE;
-      exceptions = new long[0];
-    } else {
-      String[] values = src.split(":");
-      highWatermark = Long.parseLong(values[0]);
-      minOpenTxn = Long.parseLong(values[1]);
-      exceptions = new long[values.length - 2];
-      for(int i = 2; i < values.length; ++i) {
-        exceptions[i-2] = Long.parseLong(values[i]);
-      }
-    }
-  }
-
-  @VisibleForTesting
-  long getMinOpenTxn() {
-    return minOpenTxn;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/293e22e0/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
 
b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
index c0923eb..c249854 100644
--- 
a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
+++ 
b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.metastore.txn;
 
+import org.apache.hadoop.hive.common.ValidCompactorTxnList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.junit.Assert;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/hive/blob/293e22e0/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 113ba65..3e7bb93 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -26,8 +26,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.StringableMap;
+import org.apache.hadoop.hive.common.ValidCompactorTxnList;
 import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -132,6 +132,10 @@ public class CompactorMR {
       overrideTblProps(job, t.getParameters(), ci.properties);
     }
     setColumnTypes(job, sd.getCols());
+    //with feature on, multiple tasks may get into conflict creating/using 
TMP_LOCATION and if we were
+    //to generate the target dir in the Map task, there is no easy way to pass 
it to OutputCommitter
+    //to do the final move
+    job.setBoolean("mapreduce.map.speculative", false);
     return job;
   }
 
@@ -622,7 +626,7 @@ public class CompactorMR {
         AcidInputFormat<WritableComparable, V> aif =
         instantiate(AcidInputFormat.class, 
jobConf.get(INPUT_FORMAT_CLASS_NAME));
       ValidTxnList txnList =
-        new ValidReadTxnList(jobConf.get(ValidTxnList.VALID_TXNS_KEY));
+        new ValidCompactorTxnList(jobConf.get(ValidTxnList.VALID_TXNS_KEY));
 
       boolean isMajor = jobConf.getBoolean(IS_MAJOR, false);
       AcidInputFormat.RawReader<V> reader =

http://git-wip-us.apache.org/repos/asf/hive/blob/293e22e0/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java 
b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index af70f0c..3172723 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.hive.ql.io;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.txn.ValidCompactorTxnList;
+import org.apache.hadoop.hive.common.ValidCompactorTxnList;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile;

Reply via email to