http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/JvmMetrics.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/JvmMetrics.java b/common/src/java/org/apache/hadoop/hive/common/JvmMetrics.java new file mode 100644 index 0000000..64f2819 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/JvmMetrics.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +import static org.apache.hadoop.hive.common.JvmMetricsInfo.*; + +import org.apache.hadoop.log.metrics.EventCounter; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.Interns; + +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; +import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; + +/** + * JVM and logging related metrics. Ported from Hadoop JvmMetrics. + * Mostly used by various servers as a part of the metrics they export. + */ +public class JvmMetrics implements MetricsSource { + enum Singleton { + INSTANCE; + + JvmMetrics impl; + + synchronized JvmMetrics init(String processName, String sessionId) { + if (impl == null) { + impl = create(processName, sessionId, DefaultMetricsSystem.instance()); + } + return impl; + } + } + + static final float M = 1024*1024; + + final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + final List<GarbageCollectorMXBean> gcBeans = + ManagementFactory.getGarbageCollectorMXBeans(); + final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + final String processName, sessionId; + private JvmPauseMonitor pauseMonitor = null; + final ConcurrentHashMap<String, MetricsInfo[]> gcInfoCache = + new ConcurrentHashMap<String, MetricsInfo[]>(); + + JvmMetrics(String processName, String sessionId) { + this.processName = processName; + this.sessionId = sessionId; + } + + public void setPauseMonitor(final JvmPauseMonitor pauseMonitor) { + this.pauseMonitor = pauseMonitor; + } + + public static JvmMetrics create(String processName, String sessionId, MetricsSystem ms) { + return ms.register(JvmMetrics.name(), JvmMetrics.description(), + new JvmMetrics(processName, sessionId)); + } + + public static JvmMetrics initSingleton(String processName, String sessionId) { + return Singleton.INSTANCE.init(processName, sessionId); + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder rb = collector.addRecord(JvmMetrics) + .setContext("jvm").tag(ProcessName, processName) + .tag(SessionId, sessionId); + getMemoryUsage(rb); + getGcUsage(rb); + getThreadUsage(rb); + getEventCounters(rb); + } + + private void getMemoryUsage(MetricsRecordBuilder rb) { + MemoryUsage memNonHeap = memoryMXBean.getNonHeapMemoryUsage(); + MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage(); + Runtime runtime = Runtime.getRuntime(); + rb.addGauge(MemNonHeapUsedM, memNonHeap.getUsed() / M) + .addGauge(MemNonHeapCommittedM, memNonHeap.getCommitted() / M) + .addGauge(MemNonHeapMaxM, memNonHeap.getMax() / M) + .addGauge(MemHeapUsedM, memHeap.getUsed() / M) + .addGauge(MemHeapCommittedM, memHeap.getCommitted() / M) + .addGauge(MemHeapMaxM, memHeap.getMax() / M) + .addGauge(MemMaxM, runtime.maxMemory() / M); + } + + private void getGcUsage(MetricsRecordBuilder rb) { + long count = 0; + long timeMillis = 0; + for (GarbageCollectorMXBean gcBean : gcBeans) { + long c = gcBean.getCollectionCount(); + long t = gcBean.getCollectionTime(); + MetricsInfo[] gcInfo = getGcInfo(gcBean.getName()); + rb.addCounter(gcInfo[0], c).addCounter(gcInfo[1], t); + count += c; + timeMillis += t; + } + rb.addCounter(GcCount, count) + .addCounter(GcTimeMillis, timeMillis); + + if (pauseMonitor != null) { + rb.addCounter(GcNumWarnThresholdExceeded, + pauseMonitor.getNumGcWarnThreadholdExceeded()); + rb.addCounter(GcNumInfoThresholdExceeded, + pauseMonitor.getNumGcInfoThresholdExceeded()); + rb.addCounter(GcTotalExtraSleepTime, + pauseMonitor.getTotalGcExtraSleepTime()); + } + } + + private MetricsInfo[] getGcInfo(String gcName) { + MetricsInfo[] gcInfo = gcInfoCache.get(gcName); + if (gcInfo == null) { + gcInfo = new MetricsInfo[2]; + gcInfo[0] = Interns.info("GcCount" + gcName, "GC Count for " + gcName); + gcInfo[1] = Interns + .info("GcTimeMillis" + gcName, "GC Time for " + gcName); + MetricsInfo[] previousGcInfo = gcInfoCache.putIfAbsent(gcName, gcInfo); + if (previousGcInfo != null) { + return previousGcInfo; + } + } + return gcInfo; + } + + private void getThreadUsage(MetricsRecordBuilder rb) { + int threadsNew = 0; + int threadsRunnable = 0; + int threadsBlocked = 0; + int threadsWaiting = 0; + int threadsTimedWaiting = 0; + int threadsTerminated = 0; + long threadIds[] = threadMXBean.getAllThreadIds(); + for (ThreadInfo threadInfo : threadMXBean.getThreadInfo(threadIds, 0)) { + if (threadInfo == null) continue; // race protection + switch (threadInfo.getThreadState()) { + case NEW: threadsNew++; break; + case RUNNABLE: threadsRunnable++; break; + case BLOCKED: threadsBlocked++; break; + case WAITING: threadsWaiting++; break; + case TIMED_WAITING: threadsTimedWaiting++; break; + case TERMINATED: threadsTerminated++; break; + } + } + rb.addGauge(ThreadsNew, threadsNew) + .addGauge(ThreadsRunnable, threadsRunnable) + .addGauge(ThreadsBlocked, threadsBlocked) + .addGauge(ThreadsWaiting, threadsWaiting) + .addGauge(ThreadsTimedWaiting, threadsTimedWaiting) + .addGauge(ThreadsTerminated, threadsTerminated); + } + + private void getEventCounters(MetricsRecordBuilder rb) { + rb.addCounter(LogFatal, EventCounter.getFatal()) + .addCounter(LogError, EventCounter.getError()) + .addCounter(LogWarn, EventCounter.getWarn()) + .addCounter(LogInfo, EventCounter.getInfo()); + } +}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/JvmMetricsInfo.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/JvmMetricsInfo.java b/common/src/java/org/apache/hadoop/hive/common/JvmMetricsInfo.java new file mode 100644 index 0000000..3ab73c5 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/JvmMetricsInfo.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +import com.google.common.base.Objects; + +import org.apache.hadoop.metrics2.MetricsInfo; + +/** + * JVM and logging related metrics info instances. Ported from Hadoop JvmMetricsInfo. + */ +public enum JvmMetricsInfo implements MetricsInfo { + JvmMetrics("JVM related metrics etc."), // record info + // metrics + MemNonHeapUsedM("Non-heap memory used in MB"), + MemNonHeapCommittedM("Non-heap memory committed in MB"), + MemNonHeapMaxM("Non-heap memory max in MB"), + MemHeapUsedM("Heap memory used in MB"), + MemHeapCommittedM("Heap memory committed in MB"), + MemHeapMaxM("Heap memory max in MB"), + MemMaxM("Max memory size in MB"), + GcCount("Total GC count"), + GcTimeMillis("Total GC time in milliseconds"), + ThreadsNew("Number of new threads"), + ThreadsRunnable("Number of runnable threads"), + ThreadsBlocked("Number of blocked threads"), + ThreadsWaiting("Number of waiting threads"), + ThreadsTimedWaiting("Number of timed waiting threads"), + ThreadsTerminated("Number of terminated threads"), + LogFatal("Total number of fatal log events"), + LogError("Total number of error log events"), + LogWarn("Total number of warning log events"), + LogInfo("Total number of info log events"), + GcNumWarnThresholdExceeded("Number of times that the GC warn threshold is exceeded"), + GcNumInfoThresholdExceeded("Number of times that the GC info threshold is exceeded"), + GcTotalExtraSleepTime("Total GC extra sleep time in milliseconds"); + + private final String desc; + + JvmMetricsInfo(String desc) { this.desc = desc; } + + @Override public String description() { return desc; } + + @Override public String toString() { + return Objects.toStringHelper(this) + .add("name", name()).add("description", desc) + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/LogUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/LogUtils.java b/common/src/java/org/apache/hadoop/hive/common/LogUtils.java index c2a0d9a..83f3af7 100644 --- a/common/src/java/org/apache/hadoop/hive/common/LogUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/LogUtils.java @@ -25,11 +25,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.config.Configurator; import org.apache.logging.log4j.core.impl.Log4jContextFactory; +import org.apache.logging.log4j.spi.DefaultThreadContextMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import com.google.common.annotations.VisibleForTesting; @@ -45,8 +46,15 @@ public class LogUtils { /** * Constants for log masking */ - private static String KEY_TO_MASK_WITH = "password"; - private static String MASKED_VALUE = "###_MASKED_###"; + private static final String KEY_TO_MASK_WITH = "password"; + private static final String MASKED_VALUE = "###_MASKED_###"; + + /** + * Constants of the key strings for the logging ThreadContext. + */ + public static final String SESSIONID_LOG_KEY = "sessionId"; + public static final String QUERYID_LOG_KEY = "queryId"; + public static final String OPERATIONLOG_LEVEL_KEY = "operationLogLevel"; @SuppressWarnings("serial") public static class LogInitializationException extends Exception { @@ -110,6 +118,8 @@ public class LogUtils { System.setProperty(HiveConf.ConfVars.HIVEQUERYID.toString(), queryId); } final boolean async = checkAndSetAsyncLogging(conf); + // required for MDC based routing appender so that child threads can inherit the MDC context + System.setProperty(DefaultThreadContextMap.INHERITABLE_MAP, "true"); Configurator.initialize(null, log4jFileName); logConfigLocation(conf); return "Logging initialized using configuration in " + log4jConfigFile + " Async: " + async; @@ -152,6 +162,7 @@ public class LogUtils { } if (hive_l4j != null) { final boolean async = checkAndSetAsyncLogging(conf); + System.setProperty(DefaultThreadContextMap.INHERITABLE_MAP, "true"); Configurator.initialize(null, hive_l4j.toString()); logConfigLocation(conf); return (logMessage + "\n" + "Logging initialized using configuration in " + hive_l4j + @@ -193,4 +204,22 @@ public class LogUtils { } return value; } + + /** + * Register logging context so that log system can print QueryId, SessionId, etc for each message + */ + public static void registerLoggingContext(Configuration conf) { + MDC.put(SESSIONID_LOG_KEY, HiveConf.getVar(conf, HiveConf.ConfVars.HIVESESSIONID)); + MDC.put(QUERYID_LOG_KEY, HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID)); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { + MDC.put(OPERATIONLOG_LEVEL_KEY, HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL)); + } + } + + /** + * Unregister logging context + */ + public static void unregisterLoggingContext() { + MDC.clear(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java b/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java new file mode 100644 index 0000000..36ae56f --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +/** + * Interface that can be used to provide size estimates based on data structures held in memory for an object instance. + */ +public interface MemoryEstimate { + /** + * Returns estimated memory size based {@link org.apache.hadoop.hive.ql.util.JavaDataModel} + * + * @return estimated memory size in bytes + */ + long getEstimatedMemorySize(); +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java b/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java index 926b4a6..a9e17c2 100644 --- a/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java +++ b/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java @@ -49,7 +49,7 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize; public class StatsSetupConst { - protected final static Logger LOG = LoggerFactory.getLogger(StatsSetupConst.class.getName()); + protected static final Logger LOG = LoggerFactory.getLogger(StatsSetupConst.class.getName()); public enum StatDB { fs { http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/StringInternUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/StringInternUtils.java b/common/src/java/org/apache/hadoop/hive/common/StringInternUtils.java index c729991..92d37e8 100644 --- a/common/src/java/org/apache/hadoop/hive/common/StringInternUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/StringInternUtils.java @@ -104,13 +104,21 @@ public class StringInternUtils { * This method interns all the strings in the given list in place. That is, * it iterates over the list, replaces each element with the interned copy * and eventually returns the same list. + * + * Note that the provided List implementation should return an iterator + * (via list.listIterator()) method, and that iterator should implement + * the set(Object) method. That's what all List implementations in the JDK + * provide. However, if some custom List implementation doesn't have this + * functionality, this method will return without interning its elements. */ public static List<String> internStringsInList(List<String> list) { if (list != null) { - ListIterator<String> it = list.listIterator(); - while (it.hasNext()) { - it.set(it.next().intern()); - } + try { + ListIterator<String> it = list.listIterator(); + while (it.hasNext()) { + it.set(it.next().intern()); + } + } catch (UnsupportedOperationException e) { } // set() not implemented - ignore } return list; } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java index 334b93e..8f55354 100644 --- a/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java +++ b/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.common; import java.util.Arrays; +import java.util.BitSet; /** * An implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor. @@ -40,11 +41,12 @@ public class ValidCompactorTxnList extends ValidReadTxnList { } /** * @param abortedTxnList list of all aborted transactions + * @param abortedBits bitset marking whether the corresponding transaction is aborted * @param highWatermark highest committed transaction to be considered for compaction, * equivalently (lowest_open_txn - 1). */ - public ValidCompactorTxnList(long[] abortedTxnList, long highWatermark) { - super(abortedTxnList, highWatermark); + public ValidCompactorTxnList(long[] abortedTxnList, BitSet abortedBits, long highWatermark) { + super(abortedTxnList, abortedBits, highWatermark); // abortedBits should be all true as everything in exceptions are aborted txns if(this.exceptions.length <= 0) { return; } @@ -75,4 +77,9 @@ public class ValidCompactorTxnList extends ValidReadTxnList { public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) { return highWatermark >= maxTxnId ? RangeResponse.ALL : RangeResponse.NONE; } + + @Override + public boolean isTxnAborted(long txnid) { + return Arrays.binarySearch(exceptions, txnid) >= 0; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java index 2f35917..4e57772 100644 --- a/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java +++ b/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.common; import com.google.common.annotations.VisibleForTesting; import java.util.Arrays; +import java.util.BitSet; /** * An implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by readers. @@ -30,32 +31,27 @@ import java.util.Arrays; public class ValidReadTxnList implements ValidTxnList { protected long[] exceptions; + protected BitSet abortedBits; // BitSet for flagging aborted transactions. Bit is true if aborted, false if open //default value means there are no open txn in the snapshot private long minOpenTxn = Long.MAX_VALUE; protected long highWatermark; public ValidReadTxnList() { - this(new long[0], Long.MAX_VALUE, Long.MAX_VALUE); + this(new long[0], new BitSet(), Long.MAX_VALUE, Long.MAX_VALUE); } /** * Used if there are no open transactions in the snapshot */ - public ValidReadTxnList(long[] exceptions, long highWatermark) { - this(exceptions, highWatermark, Long.MAX_VALUE); + public ValidReadTxnList(long[] exceptions, BitSet abortedBits, long highWatermark) { + this(exceptions, abortedBits, highWatermark, Long.MAX_VALUE); } - public ValidReadTxnList(long[] exceptions, long highWatermark, long minOpenTxn) { - if (exceptions.length == 0) { - this.exceptions = exceptions; - } else { - this.exceptions = exceptions.clone(); - Arrays.sort(this.exceptions); + public ValidReadTxnList(long[] exceptions, BitSet abortedBits, long highWatermark, long minOpenTxn) { + if (exceptions.length > 0) { this.minOpenTxn = minOpenTxn; - if(this.exceptions[0] <= 0) { - //should never happen of course - throw new IllegalArgumentException("Invalid txnid: " + this.exceptions[0] + " found"); - } } + this.exceptions = exceptions; + this.abortedBits = abortedBits; this.highWatermark = highWatermark; } @@ -118,12 +114,28 @@ public class ValidReadTxnList implements ValidTxnList { buf.append(':'); buf.append(minOpenTxn); if (exceptions.length == 0) { - buf.append(':'); + buf.append(':'); // separator for open txns + buf.append(':'); // separator for aborted txns } else { - for(long except: exceptions) { - buf.append(':'); - buf.append(except); + StringBuilder open = new StringBuilder(); + StringBuilder abort = new StringBuilder(); + for (int i = 0; i < exceptions.length; i++) { + if (abortedBits.get(i)) { + if (abort.length() > 0) { + abort.append(','); + } + abort.append(exceptions[i]); + } else { + if (open.length() > 0) { + open.append(','); + } + open.append(exceptions[i]); + } } + buf.append(':'); + buf.append(open); + buf.append(':'); + buf.append(abort); } return buf.toString(); } @@ -133,13 +145,41 @@ public class ValidReadTxnList implements ValidTxnList { if (src == null || src.length() == 0) { highWatermark = Long.MAX_VALUE; exceptions = new long[0]; + abortedBits = new BitSet(); } else { String[] values = src.split(":"); highWatermark = Long.parseLong(values[0]); minOpenTxn = Long.parseLong(values[1]); - exceptions = new long[values.length - 2]; - for(int i = 2; i < values.length; ++i) { - exceptions[i-2] = Long.parseLong(values[i]); + String[] openTxns = new String[0]; + String[] abortedTxns = new String[0]; + if (values.length < 3) { + openTxns = new String[0]; + abortedTxns = new String[0]; + } else if (values.length == 3) { + if (!values[2].isEmpty()) { + openTxns = values[2].split(","); + } + } else { + if (!values[2].isEmpty()) { + openTxns = values[2].split(","); + } + if (!values[3].isEmpty()) { + abortedTxns = values[3].split(","); + } + } + exceptions = new long[openTxns.length + abortedTxns.length]; + int i = 0; + for (String open : openTxns) { + exceptions[i++] = Long.parseLong(open); + } + for (String abort : abortedTxns) { + exceptions[i++] = Long.parseLong(abort); + } + Arrays.sort(exceptions); + abortedBits = new BitSet(exceptions.length); + for (String abort : abortedTxns) { + int index = Arrays.binarySearch(exceptions, Long.parseLong(abort)); + abortedBits.set(index); } } } @@ -157,5 +197,40 @@ public class ValidReadTxnList implements ValidTxnList { public long getMinOpenTxn() { return minOpenTxn; } + + @Override + public boolean isTxnAborted(long txnid) { + int index = Arrays.binarySearch(exceptions, txnid); + return index >= 0 && abortedBits.get(index); + } + + @Override + public RangeResponse isTxnRangeAborted(long minTxnId, long maxTxnId) { + // check the easy cases first + if (highWatermark < minTxnId) { + return RangeResponse.NONE; + } + + int count = 0; // number of aborted txns found in exceptions + + // traverse the aborted txns list, starting at first aborted txn index + for (int i = abortedBits.nextSetBit(0); i >= 0; i = abortedBits.nextSetBit(i + 1)) { + long abortedTxnId = exceptions[i]; + if (abortedTxnId > maxTxnId) { // we've already gone beyond the specified range + break; + } + if (abortedTxnId >= minTxnId && abortedTxnId <= maxTxnId) { + count++; + } + } + + if (count == 0) { + return RangeResponse.NONE; + } else if (count == (maxTxnId - minTxnId + 1)) { + return RangeResponse.ALL; + } else { + return RangeResponse.SOME; + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java index 5e1e4ee..d4ac02c 100644 --- a/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java +++ b/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java @@ -71,7 +71,7 @@ public interface ValidTxnList { /** * Populate this validTxnList from the string. It is assumed that the string - * was created via {@link #writeToString()}. + * was created via {@link #writeToString()} and the exceptions list is sorted. * @param src source string. */ public void readFromString(String src); @@ -89,4 +89,20 @@ public interface ValidTxnList { * @return a list of invalid transaction ids */ public long[] getInvalidTransactions(); + + /** + * Indicates whether a given transaction is aborted. + * @param txnid id for the transaction + * @return true if aborted, false otherwise + */ + public boolean isTxnAborted(long txnid); + + /** + * Find out if a range of transaction ids are aborted. + * @param minTxnId minimum txnid to look for, inclusive + * @param maxTxnId maximum txnid to look for, inclusive + * @return Indicate whether none, some, or all of these transactions are aborted. + */ + public RangeResponse isTxnRangeAborted(long minTxnId, long maxTxnId); + } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Connection.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Connection.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Connection.java new file mode 100644 index 0000000..0df6f4c --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Connection.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.jsonexplain; + +public final class Connection implements Comparable<Connection>{ + public final String type; + public final Vertex from; + + public Connection(String type, Vertex from) { + super(); + this.type = type; + this.from = from; + } + + @Override + public int compareTo(Connection o) { + return from.compareTo(o.from); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParser.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParser.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParser.java new file mode 100644 index 0000000..1f01685 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParser.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.jsonexplain; + +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.hadoop.hive.common.jsonexplain.JsonParser; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class DagJsonParser implements JsonParser { + public final Map<String, Stage> stages = new LinkedHashMap<>(); + protected final Logger LOG; + // the objects that have been printed. + public final Set<Object> printSet = new LinkedHashSet<>(); + // the vertex that should be inlined. <Operator, list of Vertex that is + // inlined> + public final Map<Op, List<Connection>> inlineMap = new LinkedHashMap<>(); + + public DagJsonParser() { + super(); + LOG = LoggerFactory.getLogger(this.getClass().getName()); + } + + public void extractStagesAndPlans(JSONObject inputObject) throws Exception { + // extract stages + JSONObject dependency = inputObject.getJSONObject("STAGE DEPENDENCIES"); + if (dependency != null && dependency.length() > 0) { + // iterate for the first time to get all the names of stages. + for (String stageName : JSONObject.getNames(dependency)) { + this.stages.put(stageName, new Stage(stageName, this)); + } + // iterate for the second time to get all the dependency. + for (String stageName : JSONObject.getNames(dependency)) { + JSONObject dependentStageNames = dependency.getJSONObject(stageName); + this.stages.get(stageName).addDependency(dependentStageNames, this.stages); + } + } + // extract stage plans + JSONObject stagePlans = inputObject.getJSONObject("STAGE PLANS"); + if (stagePlans != null && stagePlans.length() > 0) { + for (String stageName : JSONObject.getNames(stagePlans)) { + JSONObject stagePlan = stagePlans.getJSONObject(stageName); + this.stages.get(stageName).extractVertex(stagePlan); + } + } + } + + /** + * @param indentFlag + * help to generate correct indent + * @return + */ + public static String prefixString(int indentFlag) { + StringBuilder sb = new StringBuilder(); + for (int index = 0; index < indentFlag; index++) { + sb.append(" "); + } + return sb.toString(); + } + + /** + * @param indentFlag + * @param tail + * help to generate correct indent with a specific tail + * @return + */ + public static String prefixString(int indentFlag, String tail) { + StringBuilder sb = new StringBuilder(); + for (int index = 0; index < indentFlag; index++) { + sb.append(" "); + } + int len = sb.length(); + return sb.replace(len - tail.length(), len, tail).toString(); + } + + @Override + public void print(JSONObject inputObject, PrintStream outputStream) throws Exception { + LOG.info("JsonParser is parsing:" + inputObject.toString()); + this.extractStagesAndPlans(inputObject); + Printer printer = new Printer(); + // print out the cbo info + if (inputObject.has("cboInfo")) { + printer.println(inputObject.getString("cboInfo")); + printer.println(); + } + // print out the vertex dependency in root stage + for (Stage candidate : this.stages.values()) { + if (candidate.tezStageDependency != null && candidate.tezStageDependency.size() > 0) { + printer.println("Vertex dependency in root stage"); + for (Entry<Vertex, List<Connection>> entry : candidate.tezStageDependency.entrySet()) { + StringBuilder sb = new StringBuilder(); + sb.append(entry.getKey().name); + sb.append(" <- "); + boolean printcomma = false; + for (Connection connection : entry.getValue()) { + if (printcomma) { + sb.append(", "); + } else { + printcomma = true; + } + sb.append(connection.from.name + " (" + connection.type + ")"); + } + printer.println(sb.toString()); + } + printer.println(); + } + } + // print out all the stages that have no childStages. + for (Stage candidate : this.stages.values()) { + if (candidate.childStages.isEmpty()) { + candidate.print(printer, 0); + } + } + outputStream.println(printer.toString()); + } + + public void addInline(Op op, Connection connection) { + List<Connection> list = inlineMap.get(op); + if (list == null) { + list = new ArrayList<>(); + list.add(connection); + inlineMap.put(op, list); + } else { + list.add(connection); + } + } + + public boolean isInline(Vertex v) { + for (List<Connection> list : inlineMap.values()) { + for (Connection connection : list) { + if (connection.from.equals(v)) { + return true; + } + } + } + return false; + } + + public abstract String mapEdgeType(String edgeName); + + public abstract String getFrameworkName(); +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParserUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParserUtils.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParserUtils.java new file mode 100644 index 0000000..a518ac1 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParserUtils.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.jsonexplain; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + + +public class DagJsonParserUtils { + + public static List<String> OperatorNoStats = Arrays.asList(new String[] { "File Output Operator", + "Reduce Output Operator" }); + + public static String renameReduceOutputOperator(String operatorName, Vertex vertex) { + if (operatorName.equals("Reduce Output Operator") && vertex.edgeType != null) { + return vertex.edgeType; + } else { + return operatorName; + } + } + + public static String attrsToString(Map<String, String> attrs) { + StringBuffer sb = new StringBuffer(); + boolean first = true; + for (Entry<String, String> entry : attrs.entrySet()) { + if (first) { + first = false; + } else { + sb.append(","); + } + sb.append(entry.getKey() + entry.getValue()); + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/JsonParserFactory.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/JsonParserFactory.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/JsonParserFactory.java index db118bf..2a5d47a 100644 --- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/JsonParserFactory.java +++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/JsonParserFactory.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.common.jsonexplain; +import org.apache.hadoop.hive.common.jsonexplain.spark.SparkJsonParser; import org.apache.hadoop.hive.common.jsonexplain.tez.TezJsonParser; import org.apache.hadoop.hive.conf.HiveConf; @@ -35,6 +36,9 @@ public class JsonParserFactory { if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { return new TezJsonParser(); } + if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + return new SparkJsonParser(); + } return null; } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Op.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Op.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Op.java new file mode 100644 index 0000000..03c5981 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Op.java @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.jsonexplain; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.hadoop.hive.common.jsonexplain.Vertex.VertexType; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + +public final class Op { + public final String name; + // tezJsonParser + public final DagJsonParser parser; + public final String operatorId; + public Op parent; + public final List<Op> children; + public final Map<String, String> attrs; + // the jsonObject for this operator + public final JSONObject opObject; + // the vertex that this operator belongs to + public final Vertex vertex; + // the vertex that this operator output to + public final String outputVertexName; + // the Operator type + public final OpType type; + + public enum OpType { + MAPJOIN, MERGEJOIN, RS, OTHERS + }; + + public Op(String name, String id, String outputVertexName, List<Op> children, + Map<String, String> attrs, JSONObject opObject, Vertex vertex, DagJsonParser tezJsonParser) + throws JSONException { + super(); + this.name = name; + this.operatorId = id; + this.type = deriveOpType(operatorId); + this.outputVertexName = outputVertexName; + this.children = children; + this.attrs = attrs; + this.opObject = opObject; + this.vertex = vertex; + this.parser = tezJsonParser; + } + + private OpType deriveOpType(String operatorId) { + if (operatorId != null) { + if (operatorId.startsWith(OpType.MAPJOIN.toString())) { + return OpType.MAPJOIN; + } else if (operatorId.startsWith(OpType.MERGEJOIN.toString())) { + return OpType.MERGEJOIN; + } else if (operatorId.startsWith(OpType.RS.toString())) { + return OpType.RS; + } else { + return OpType.OTHERS; + } + } else { + return OpType.OTHERS; + } + } + + private void inlineJoinOp() throws Exception { + // inline map join operator + if (this.type == OpType.MAPJOIN) { + JSONObject joinObj = opObject.getJSONObject(this.name); + // get the map for posToVertex + Map<String, Vertex> posToVertex = new LinkedHashMap<>(); + if (joinObj.has("input vertices:")) { + JSONObject verticeObj = joinObj.getJSONObject("input vertices:"); + for (String pos : JSONObject.getNames(verticeObj)) { + String vertexName = verticeObj.getString(pos); + // update the connection + Connection c = null; + for (Connection connection : vertex.parentConnections) { + if (connection.from.name.equals(vertexName)) { + posToVertex.put(pos, connection.from); + c = connection; + break; + } + } + if (c != null) { + parser.addInline(this, c); + } + } + // update the attrs + this.attrs.remove("input vertices:"); + } + // update the keys to use operator name + JSONObject keys = joinObj.getJSONObject("keys:"); + // find out the vertex for the big table + Set<Vertex> parentVertexes = new HashSet<>(); + for (Connection connection : vertex.parentConnections) { + parentVertexes.add(connection.from); + } + parentVertexes.removeAll(posToVertex.values()); + Map<String, String> posToOpId = new LinkedHashMap<>(); + if (keys.length() != 0) { + for (String key : JSONObject.getNames(keys)) { + // first search from the posToVertex + if (posToVertex.containsKey(key)) { + Vertex v = posToVertex.get(key); + if (v.rootOps.size() == 1) { + posToOpId.put(key, v.rootOps.get(0).operatorId); + } else if ((v.rootOps.size() == 0 && v.vertexType == VertexType.UNION)) { + posToOpId.put(key, v.name); + } else { + Op joinRSOp = v.getJoinRSOp(vertex); + if (joinRSOp != null) { + posToOpId.put(key, joinRSOp.operatorId); + } else { + throw new Exception( + "Can not find join reduceSinkOp for " + v.name + " to join " + vertex.name + + " when hive explain user is trying to identify the operator id."); + } + } + } + // then search from parent + else if (parent != null) { + posToOpId.put(key, parent.operatorId); + } + // then assume it is from its own vertex + else if (parentVertexes.size() == 1) { + Vertex v = parentVertexes.iterator().next(); + parentVertexes.clear(); + if (v.rootOps.size() == 1) { + posToOpId.put(key, v.rootOps.get(0).operatorId); + } else if ((v.rootOps.size() == 0 && v.vertexType == VertexType.UNION)) { + posToOpId.put(key, v.name); + } else { + Op joinRSOp = v.getJoinRSOp(vertex); + if (joinRSOp != null) { + posToOpId.put(key, joinRSOp.operatorId); + } else { + throw new Exception( + "Can not find join reduceSinkOp for " + v.name + " to join " + vertex.name + + " when hive explain user is trying to identify the operator id."); + } + } + } + // finally throw an exception + else { + throw new Exception( + "Can not find the source operator on one of the branches of map join."); + } + } + } + this.attrs.remove("keys:"); + StringBuffer sb = new StringBuffer(); + JSONArray conditionMap = joinObj.getJSONArray("condition map:"); + for (int index = 0; index < conditionMap.length(); index++) { + JSONObject cond = conditionMap.getJSONObject(index); + String k = (String) cond.keys().next(); + JSONObject condObject = new JSONObject((String)cond.get(k)); + String type = condObject.getString("type"); + String left = condObject.getString("left"); + String right = condObject.getString("right"); + if (keys.length() != 0) { + sb.append(posToOpId.get(left) + "." + keys.get(left) + "=" + posToOpId.get(right) + "." + + keys.get(right) + "(" + type + "),"); + } else { + // probably a cross product + sb.append("(" + type + "),"); + } + } + this.attrs.remove("condition map:"); + this.attrs.put("Conds:", sb.substring(0, sb.length() - 1)); + } + // should be merge join + else { + Map<String, String> posToOpId = new LinkedHashMap<>(); + if (vertex.mergeJoinDummyVertexs.size() == 0) { + if (vertex.tagToInput.size() != vertex.parentConnections.size()) { + throw new Exception("tagToInput size " + vertex.tagToInput.size() + + " is different from parentConnections size " + vertex.parentConnections.size()); + } + for (Entry<String, String> entry : vertex.tagToInput.entrySet()) { + Connection c = null; + for (Connection connection : vertex.parentConnections) { + if (connection.from.name.equals(entry.getValue())) { + Vertex v = connection.from; + if (v.rootOps.size() == 1) { + posToOpId.put(entry.getKey(), v.rootOps.get(0).operatorId); + } else if ((v.rootOps.size() == 0 && v.vertexType == VertexType.UNION)) { + posToOpId.put(entry.getKey(), v.name); + } else { + Op joinRSOp = v.getJoinRSOp(vertex); + if (joinRSOp != null) { + posToOpId.put(entry.getKey(), joinRSOp.operatorId); + } else { + throw new Exception( + "Can not find join reduceSinkOp for " + v.name + " to join " + vertex.name + + " when hive explain user is trying to identify the operator id."); + } + } + c = connection; + break; + } + } + if (c == null) { + throw new Exception("Can not find " + entry.getValue() + + " while parsing keys of merge join operator"); + } + } + } else { + posToOpId.put(vertex.tag, this.parent.operatorId); + for (Vertex v : vertex.mergeJoinDummyVertexs) { + if (v.rootOps.size() != 1) { + throw new Exception("Can not find a single root operators in a single vertex " + v.name + + " when hive explain user is trying to identify the operator id."); + } + posToOpId.put(v.tag, v.rootOps.get(0).operatorId); + } + } + JSONObject joinObj = opObject.getJSONObject(this.name); + // update the keys to use operator name + JSONObject keys = joinObj.getJSONObject("keys:"); + if (keys.length() != 0) { + for (String key : JSONObject.getNames(keys)) { + if (!posToOpId.containsKey(key)) { + throw new Exception( + "Can not find the source operator on one of the branches of merge join."); + } + } + // inline merge join operator in a self-join + if (this.vertex != null) { + for (Vertex v : this.vertex.mergeJoinDummyVertexs) { + parser.addInline(this, new Connection(null, v)); + } + } + } + // update the attrs + this.attrs.remove("keys:"); + StringBuffer sb = new StringBuffer(); + JSONArray conditionMap = joinObj.getJSONArray("condition map:"); + for (int index = 0; index < conditionMap.length(); index++) { + JSONObject cond = conditionMap.getJSONObject(index); + String k = (String) cond.keys().next(); + JSONObject condObject = new JSONObject((String)cond.get(k)); + String type = condObject.getString("type"); + String left = condObject.getString("left"); + String right = condObject.getString("right"); + if (keys.length() != 0) { + sb.append(posToOpId.get(left) + "." + keys.get(left) + "=" + posToOpId.get(right) + "." + + keys.get(right) + "(" + type + "),"); + } else { + // probably a cross product + sb.append("(" + type + "),"); + } + } + this.attrs.remove("condition map:"); + this.attrs.put("Conds:", sb.substring(0, sb.length() - 1)); + } + } + + private String getNameWithOpIdStats() { + StringBuffer sb = new StringBuffer(); + sb.append(DagJsonParserUtils.renameReduceOutputOperator(name, vertex)); + if (operatorId != null) { + sb.append(" [" + operatorId + "]"); + } + if (!DagJsonParserUtils.OperatorNoStats.contains(name) && attrs.containsKey("Statistics:")) { + sb.append(" (" + attrs.get("Statistics:") + ")"); + } + attrs.remove("Statistics:"); + return sb.toString(); + } + + /** + * @param printer + * @param indentFlag + * @param branchOfJoinOp + * This parameter is used to show if it is a branch of a Join + * operator so that we can decide the corresponding indent. + * @throws Exception + */ + public void print(Printer printer, int indentFlag, boolean branchOfJoinOp) throws Exception { + // print name + if (parser.printSet.contains(this)) { + printer.println(DagJsonParser.prefixString(indentFlag) + " Please refer to the previous " + + this.getNameWithOpIdStats()); + return; + } + parser.printSet.add(this); + if (!branchOfJoinOp) { + printer.println(DagJsonParser.prefixString(indentFlag) + this.getNameWithOpIdStats()); + } else { + printer.println(DagJsonParser.prefixString(indentFlag, "<-") + this.getNameWithOpIdStats()); + } + branchOfJoinOp = false; + // if this operator is a Map Join Operator or a Merge Join Operator + if (this.type == OpType.MAPJOIN || this.type == OpType.MERGEJOIN) { + inlineJoinOp(); + branchOfJoinOp = true; + } + // if this operator is the last operator, we summarize the non-inlined + // vertex + List<Connection> noninlined = new ArrayList<>(); + if (this.parent == null) { + if (this.vertex != null) { + for (Connection connection : this.vertex.parentConnections) { + if (!parser.isInline(connection.from)) { + noninlined.add(connection); + } + } + } + } + // print attr + indentFlag++; + if (!attrs.isEmpty()) { + printer.println(DagJsonParser.prefixString(indentFlag) + + DagJsonParserUtils.attrsToString(attrs)); + } + // print inline vertex + if (parser.inlineMap.containsKey(this)) { + List<Connection> connections = parser.inlineMap.get(this); + Collections.sort(connections); + for (Connection connection : connections) { + connection.from.print(printer, indentFlag, connection.type, this.vertex); + } + } + // print parent op, i.e., where data comes from + if (this.parent != null) { + this.parent.print(printer, indentFlag, branchOfJoinOp); + } + // print next vertex + else { + Collections.sort(noninlined); + for (Connection connection : noninlined) { + connection.from.print(printer, indentFlag, connection.type, this.vertex); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Printer.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Printer.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Printer.java new file mode 100644 index 0000000..6f040f6 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Printer.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.jsonexplain; + +public final class Printer { + public static final String lineSeparator = System.getProperty("line.separator");; + private final StringBuilder builder = new StringBuilder(); + + public void print(String string) { + builder.append(string); + } + + public void println(String string) { + builder.append(string); + builder.append(lineSeparator); + } + + public void println() { + builder.append(lineSeparator); + } + + public String toString() { + return builder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Stage.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Stage.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Stage.java new file mode 100644 index 0000000..d21a565 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Stage.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.jsonexplain; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.jsonexplain.Vertex.VertexType; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + +public final class Stage { + //external name is used to show at the console + String externalName; + //internal name is used to track the stages + public final String internalName; + //tezJsonParser + public final DagJsonParser parser; + // upstream stages, e.g., root stage + public final List<Stage> parentStages = new ArrayList<>(); + // downstream stages. + public final List<Stage> childStages = new ArrayList<>(); + public final Map<String, Vertex> vertexs =new LinkedHashMap<>(); + public final Map<String, String> attrs = new TreeMap<>(); + Map<Vertex, List<Connection>> tezStageDependency; + // some stage may contain only a single operator, e.g., create table operator, + // fetch operator. + Op op; + + public Stage(String name, DagJsonParser tezJsonParser) { + super(); + internalName = name; + externalName = name; + parser = tezJsonParser; + } + + public void addDependency(JSONObject object, Map<String, Stage> stages) throws JSONException { + if (object.has("DEPENDENT STAGES")) { + String names = object.getString("DEPENDENT STAGES"); + for (String name : names.split(",")) { + Stage parent = stages.get(name.trim()); + this.parentStages.add(parent); + parent.childStages.add(this); + } + } + if (object.has("CONDITIONAL CHILD TASKS")) { + String names = object.getString("CONDITIONAL CHILD TASKS"); + this.externalName = this.internalName + "(CONDITIONAL CHILD TASKS: " + names + ")"; + for (String name : names.split(",")) { + Stage child = stages.get(name.trim()); + child.externalName = child.internalName + "(CONDITIONAL)"; + child.parentStages.add(this); + this.childStages.add(child); + } + } + } + + /** + * @param object + * @throws Exception + * If the object of stage contains "Tez", we need to extract the + * vertices and edges Else we need to directly extract operators + * and/or attributes. + */ + public void extractVertex(JSONObject object) throws Exception { + if (object.has(this.parser.getFrameworkName())) { + this.tezStageDependency = new TreeMap<>(); + JSONObject tez = (JSONObject) object.get(this.parser.getFrameworkName()); + JSONObject vertices = tez.getJSONObject("Vertices:"); + if (tez.has("Edges:")) { + JSONObject edges = tez.getJSONObject("Edges:"); + // iterate for the first time to get all the vertices + for (String to : JSONObject.getNames(edges)) { + vertexs.put(to, new Vertex(to, vertices.getJSONObject(to), parser)); + } + // iterate for the second time to get all the vertex dependency + for (String to : JSONObject.getNames(edges)) { + Object o = edges.get(to); + Vertex v = vertexs.get(to); + // 1 to 1 mapping + if (o instanceof JSONObject) { + JSONObject obj = (JSONObject) o; + String parent = obj.getString("parent"); + Vertex parentVertex = vertexs.get(parent); + if (parentVertex == null) { + parentVertex = new Vertex(parent, vertices.getJSONObject(parent), parser); + vertexs.put(parent, parentVertex); + } + String type = obj.getString("type"); + // for union vertex, we reverse the dependency relationship + if (!"CONTAINS".equals(type)) { + v.addDependency(new Connection(type, parentVertex)); + parentVertex.setType(type); + parentVertex.children.add(v); + } else { + parentVertex.addDependency(new Connection(type, v)); + v.children.add(parentVertex); + } + this.tezStageDependency.put(v, Arrays.asList(new Connection(type, parentVertex))); + } else { + // 1 to many mapping + JSONArray from = (JSONArray) o; + List<Connection> list = new ArrayList<>(); + for (int index = 0; index < from.length(); index++) { + JSONObject obj = from.getJSONObject(index); + String parent = obj.getString("parent"); + Vertex parentVertex = vertexs.get(parent); + if (parentVertex == null) { + parentVertex = new Vertex(parent, vertices.getJSONObject(parent), parser); + vertexs.put(parent, parentVertex); + } + String type = obj.getString("type"); + if (!"CONTAINS".equals(type)) { + v.addDependency(new Connection(type, parentVertex)); + parentVertex.setType(type); + parentVertex.children.add(v); + } else { + parentVertex.addDependency(new Connection(type, v)); + v.children.add(parentVertex); + } + list.add(new Connection(type, parentVertex)); + } + this.tezStageDependency.put(v, list); + } + } + } else { + for (String vertexName : JSONObject.getNames(vertices)) { + vertexs.put(vertexName, new Vertex(vertexName, vertices.getJSONObject(vertexName), parser)); + } + } + // The opTree in vertex is extracted + for (Vertex v : vertexs.values()) { + if (v.vertexType == VertexType.MAP || v.vertexType == VertexType.REDUCE) { + v.extractOpTree(); + v.checkMultiReduceOperator(); + } + } + } else { + String[] names = JSONObject.getNames(object); + if (names != null) { + for (String name : names) { + if (name.contains("Operator")) { + this.op = extractOp(name, object.getJSONObject(name)); + } else { + if (!object.get(name).toString().isEmpty()) { + attrs.put(name, object.get(name).toString()); + } + } + } + } + } + } + + /** + * @param opName + * @param opObj + * @return + * @throws Exception + * This method address the create table operator, fetch operator, + * etc + */ + Op extractOp(String opName, JSONObject opObj) throws Exception { + Map<String, String> attrs = new TreeMap<>(); + Vertex v = null; + if (opObj.length() > 0) { + String[] names = JSONObject.getNames(opObj); + for (String name : names) { + Object o = opObj.get(name); + if (isPrintable(o) && !o.toString().isEmpty()) { + attrs.put(name, o.toString()); + } else if (o instanceof JSONObject) { + JSONObject attrObj = (JSONObject) o; + if (attrObj.length() > 0) { + if (name.equals("Processor Tree:")) { + JSONObject object = new JSONObject(new LinkedHashMap<>()); + object.put(name, attrObj); + v = new Vertex(null, object, parser); + v.extractOpTree(); + } else { + for (String attrName : JSONObject.getNames(attrObj)) { + if (!attrObj.get(attrName).toString().isEmpty()) { + attrs.put(attrName, attrObj.get(attrName).toString()); + } + } + } + } + } else { + throw new Exception("Unsupported object in " + this.internalName); + } + } + } + Op op = new Op(opName, null, null, null, attrs, null, v, parser); + if (v != null) { + parser.addInline(op, new Connection(null, v)); + } + return op; + } + + private boolean isPrintable(Object val) { + if (val instanceof Boolean || val instanceof String || val instanceof Integer + || val instanceof Long || val instanceof Byte || val instanceof Float + || val instanceof Double || val instanceof Path) { + return true; + } + if (val != null && val.getClass().isPrimitive()) { + return true; + } + return false; + } + + public void print(Printer printer, int indentFlag) throws Exception { + // print stagename + if (parser.printSet.contains(this)) { + printer.println(DagJsonParser.prefixString(indentFlag) + " Please refer to the previous " + + externalName); + return; + } + parser.printSet.add(this); + printer.println(DagJsonParser.prefixString(indentFlag) + externalName); + // print vertexes + indentFlag++; + for (Vertex candidate : this.vertexs.values()) { + if (!parser.isInline(candidate) && candidate.children.isEmpty()) { + candidate.print(printer, indentFlag, null, null); + } + } + if (!attrs.isEmpty()) { + printer.println(DagJsonParser.prefixString(indentFlag) + + DagJsonParserUtils.attrsToString(attrs)); + } + if (op != null) { + op.print(printer, indentFlag, false); + } + indentFlag++; + // print dependent stages + for (Stage stage : this.parentStages) { + stage.print(printer, indentFlag); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java new file mode 100644 index 0000000..c93059d --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.jsonexplain; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.hive.common.jsonexplain.Op.OpType; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + +public final class Vertex implements Comparable<Vertex>{ + public final String name; + //tezJsonParser + public final DagJsonParser parser; + // vertex's parent connections. + public final List<Connection> parentConnections = new ArrayList<>(); + // vertex's children vertex. + public final List<Vertex> children = new ArrayList<>(); + // the jsonObject for this vertex + public final JSONObject vertexObject; + // whether this vertex is dummy (which does not really exists but is created), + // e.g., a dummy vertex for a mergejoin branch + public boolean dummy; + // the rootOps in this vertex + public final List<Op> rootOps = new ArrayList<>(); + // we create a dummy vertex for a mergejoin branch for a self join if this + // vertex is a mergejoin + public final List<Vertex> mergeJoinDummyVertexs = new ArrayList<>(); + // this vertex has multiple reduce operators + public int numReduceOp = 0; + // execution mode + public String executionMode = ""; + // tagToInput for reduce work + public Map<String, String> tagToInput = new LinkedHashMap<>(); + // tag + public String tag; + + public static enum VertexType { + MAP, REDUCE, UNION, UNKNOWN + }; + public VertexType vertexType; + + public static enum EdgeType { + BROADCAST, SHUFFLE, MULTICAST, PARTITION_ONLY_SHUFFLE, UNKNOWN + }; + public String edgeType; + + public Vertex(String name, JSONObject vertexObject, DagJsonParser dagJsonParser) { + super(); + this.name = name; + if (this.name != null) { + if (this.name.contains("Map")) { + this.vertexType = VertexType.MAP; + } else if (this.name.contains("Reduce")) { + this.vertexType = VertexType.REDUCE; + } else if (this.name.contains("Union")) { + this.vertexType = VertexType.UNION; + } else { + this.vertexType = VertexType.UNKNOWN; + } + } else { + this.vertexType = VertexType.UNKNOWN; + } + this.dummy = false; + this.vertexObject = vertexObject; + parser = dagJsonParser; + } + + public void addDependency(Connection connection) throws JSONException { + this.parentConnections.add(connection); + } + + /** + * @throws JSONException + * @throws JsonParseException + * @throws JsonMappingException + * @throws IOException + * @throws Exception + * We assume that there is a single top-level Map Operator Tree or a + * Reduce Operator Tree in a vertex + */ + public void extractOpTree() throws JSONException, JsonParseException, JsonMappingException, + IOException, Exception { + if (vertexObject.length() != 0) { + for (String key : JSONObject.getNames(vertexObject)) { + if (key.equals("Map Operator Tree:")) { + extractOp(vertexObject.getJSONArray(key).getJSONObject(0)); + } else if (key.equals("Reduce Operator Tree:") || key.equals("Processor Tree:")) { + extractOp(vertexObject.getJSONObject(key)); + } else if (key.equals("Join:")) { + // this is the case when we have a map-side SMB join + // one input of the join is treated as a dummy vertex + JSONArray array = vertexObject.getJSONArray(key); + for (int index = 0; index < array.length(); index++) { + JSONObject mpOpTree = array.getJSONObject(index); + Vertex v = new Vertex(null, mpOpTree, parser); + v.extractOpTree(); + v.dummy = true; + mergeJoinDummyVertexs.add(v); + } + } else if (key.equals("Merge File Operator")) { + JSONObject opTree = vertexObject.getJSONObject(key); + if (opTree.has("Map Operator Tree:")) { + extractOp(opTree.getJSONArray("Map Operator Tree:").getJSONObject(0)); + } else { + throw new Exception("Merge File Operator does not have a Map Operator Tree"); + } + } else if (key.equals("Execution mode:")) { + executionMode = " " + vertexObject.getString(key); + } else if (key.equals("tagToInput:")) { + JSONObject tagToInput = vertexObject.getJSONObject(key); + for (String tag : JSONObject.getNames(tagToInput)) { + this.tagToInput.put(tag, (String) tagToInput.get(tag)); + } + } else if (key.equals("tag:")) { + this.tag = vertexObject.getString(key); + } else if (key.equals("Local Work:")) { + extractOp(vertexObject.getJSONObject(key)); + } else { + throw new Exception("Unsupported operator tree in vertex " + this.name); + } + } + } + } + + /** + * @param operator + * @param parent + * @return + * @throws JSONException + * @throws JsonParseException + * @throws JsonMappingException + * @throws IOException + * @throws Exception + * assumption: each operator only has one parent but may have many + * children + */ + Op extractOp(JSONObject operator) throws JSONException, JsonParseException, JsonMappingException, + IOException, Exception { + String[] names = JSONObject.getNames(operator); + if (names.length != 1) { + throw new Exception("Expect only one operator in " + operator.toString()); + } else { + String opName = names[0]; + JSONObject attrObj = (JSONObject) operator.get(opName); + Map<String, String> attrs = new TreeMap<>(); + List<Op> children = new ArrayList<>(); + String id = null; + String outputVertexName = null; + if (JSONObject.getNames(attrObj) != null) { + for (String attrName : JSONObject.getNames(attrObj)) { + if (attrName.equals("children")) { + Object childrenObj = attrObj.get(attrName); + if (childrenObj instanceof JSONObject) { + if (((JSONObject) childrenObj).length() != 0) { + children.add(extractOp((JSONObject) childrenObj)); + } + } else if (childrenObj instanceof JSONArray) { + if (((JSONArray) childrenObj).length() != 0) { + JSONArray array = ((JSONArray) childrenObj); + for (int index = 0; index < array.length(); index++) { + children.add(extractOp(array.getJSONObject(index))); + } + } + } else { + throw new Exception("Unsupported operator " + this.name + + "'s children operator is neither a jsonobject nor a jsonarray"); + } + } else { + if (attrName.equals("OperatorId:")) { + id = attrObj.get(attrName).toString(); + } else if (attrName.equals("outputname:")) { + outputVertexName = attrObj.get(attrName).toString(); + } else { + if (!attrObj.get(attrName).toString().isEmpty()) { + attrs.put(attrName, attrObj.get(attrName).toString()); + } + } + } + } + } + Op op = new Op(opName, id, outputVertexName, children, attrs, operator, this, parser); + if (!children.isEmpty()) { + for (Op child : children) { + child.parent = op; + } + } else { + this.rootOps.add(op); + } + return op; + } + } + + public void print(Printer printer, int indentFlag, String type, Vertex callingVertex) + throws JSONException, Exception { + // print vertexname + if (parser.printSet.contains(this) && numReduceOp <= 1) { + if (type != null) { + printer.println(DagJsonParser.prefixString(indentFlag, "<-") + + " Please refer to the previous " + this.name + " [" + type + "]"); + } else { + printer.println(DagJsonParser.prefixString(indentFlag, "<-") + + " Please refer to the previous " + this.name); + } + return; + } + parser.printSet.add(this); + if (type != null) { + printer.println(DagJsonParser.prefixString(indentFlag, "<-") + this.name + " [" + type + "]" + + this.executionMode); + } else if (this.name != null) { + printer.println(DagJsonParser.prefixString(indentFlag) + this.name + this.executionMode); + } + // print operators + if (numReduceOp > 1 && !(callingVertex.vertexType == VertexType.UNION)) { + // find the right op + Op choose = null; + for (Op op : this.rootOps) { + if (op.outputVertexName.equals(callingVertex.name)) { + choose = op; + } + } + if (choose != null) { + choose.print(printer, indentFlag, false); + } else { + throw new Exception("Can not find the right reduce output operator for vertex " + this.name); + } + } else { + for (Op op : this.rootOps) { + // dummy vertex is treated as a branch of a join operator + if (this.dummy) { + op.print(printer, indentFlag, true); + } else { + op.print(printer, indentFlag, false); + } + } + } + if (vertexType == VertexType.UNION) { + // print dependent vertexs + indentFlag++; + for (int index = 0; index < this.parentConnections.size(); index++) { + Connection connection = this.parentConnections.get(index); + connection.from.print(printer, indentFlag, connection.type, this); + } + } + } + + /** + * We check if a vertex has multiple reduce operators. + */ + public void checkMultiReduceOperator() { + // check if it is a reduce vertex and its children is more than 1; + if (this.rootOps.size() < 2) { + return; + } + // check if all the child ops are reduce output operators + for (Op op : this.rootOps) { + if (op.type == OpType.RS) { + numReduceOp++; + } + } + } + + public void setType(String type) { + this.edgeType = this.parser.mapEdgeType(type); + } + + // The following code should be gone after HIVE-11075 using topological order + @Override + public int compareTo(Vertex o) { + // we print the vertex that has more rs before the vertex that has fewer rs. + if (numReduceOp != o.numReduceOp) { + return -(numReduceOp - o.numReduceOp); + } else { + return this.name.compareTo(o.name); + } + } + + public Op getJoinRSOp(Vertex joinVertex) { + if (rootOps.size() == 0) { + return null; + } else if (rootOps.size() == 1) { + if (rootOps.get(0).type == OpType.RS) { + return rootOps.get(0); + } else { + return null; + } + } else { + for (Op op : rootOps) { + if (op.type == OpType.RS) { + if (op.outputVertexName.equals(joinVertex.name)) { + return op; + } + } + } + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/spark/SparkJsonParser.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/spark/SparkJsonParser.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/spark/SparkJsonParser.java new file mode 100644 index 0000000..9485aa4 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/spark/SparkJsonParser.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common.jsonexplain.spark; + +import org.apache.hadoop.hive.common.jsonexplain.DagJsonParser; + + +public class SparkJsonParser extends DagJsonParser { + + @Override + public String mapEdgeType(String edgeName) { + return edgeName; + } + + @Override + public String getFrameworkName() { + return "Spark"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Connection.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Connection.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Connection.java deleted file mode 100644 index d341cb1..0000000 --- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Connection.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.common.jsonexplain.tez; - -public final class Connection { - public final String type; - public final Vertex from; - - public Connection(String type, Vertex from) { - super(); - this.type = type; - this.from = from; - } -}
