HIVE-14071 : HIVE-14014 breaks non-file outputs (Sergey Shelukhin, reviewed by Pengcheng Xiong)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b540a65f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b540a65f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b540a65f Branch: refs/heads/branch-2.1 Commit: b540a65f32b55a73c0940d8e4fc3668f0dd531f5 Parents: 2fb5555 Author: Sergey Shelukhin <[email protected]> Authored: Wed Jun 22 18:05:56 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Wed Jun 22 18:06:06 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/llap/LlapOutputFormat.java | 3 ++- .../hadoop/hive/ql/exec/FileSinkOperator.java | 23 +++++++++--------- .../hive/ql/io/StreamingOutputFormat.java | 25 ++++++++++++++++++++ 3 files changed, 39 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b540a65f/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java index 8e98aba..bf32c63 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java @@ -31,6 +31,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.hive.llap.io.api.LlapProxy; +import org.apache.hadoop.hive.ql.io.StreamingOutputFormat; import com.google.common.base.Preconditions; @@ -38,7 +39,7 @@ import com.google.common.base.Preconditions; * */ public class LlapOutputFormat<K extends Writable, V extends Writable> - implements OutputFormat<K, V> { + implements OutputFormat<K, V>, StreamingOutputFormat { public static final String LLAP_OF_ID_KEY = "llap.of.id"; http://git-wip-us.apache.org/repos/asf/hive/blob/b540a65f/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 1f95885..e154d13 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.HivePartitioner; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter; +import org.apache.hadoop.hive.ql.io.StreamingOutputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveFatalException; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; @@ -146,7 +147,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements Path taskOutputTempPath; Path[] outPaths; Path[] finalPaths; - RecordWriter[] outWriters; + RecordWriter[] outWriters; // TODO# this RecordUpdater[] updaters; Stat stat; int acidLastBucket = -1; @@ -183,14 +184,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } } - public void setOutWriters(RecordWriter[] out) { - outWriters = out; - } - - public RecordWriter[] getOutWriters() { - return outWriters; - } - public void closeWriters(boolean abort) throws HiveException { for (int idx = 0; idx < outWriters.length; idx++) { if (outWriters[idx] != null) { @@ -1012,8 +1005,16 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements row_count.set(numRows); LOG.info(toString() + ": records written - " + numRows); - if (!bDynParts && !filesCreated && !"tez".equalsIgnoreCase(hconf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname))) { - createBucketFiles(fsp); + if (!bDynParts && !filesCreated) { + boolean skipFiles = "tez".equalsIgnoreCase( + HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE)); + if (skipFiles) { + Class<?> clazz = conf.getTableInfo().getOutputFileFormatClass(); + skipFiles = !StreamingOutputFormat.class.isAssignableFrom(clazz); + } + if (!skipFiles) { + createBucketFiles(fsp); + } } lastProgressReport = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/hive/blob/b540a65f/ql/src/java/org/apache/hadoop/hive/ql/io/StreamingOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/StreamingOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/StreamingOutputFormat.java new file mode 100644 index 0000000..bdc80cc --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/StreamingOutputFormat.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io; + +/** + * Marker interface for streaming output formats. For now, this is used to out out of the + * optimizations that do not output empty results to avoid empty files. + */ +public interface StreamingOutputFormat { +}
