Repository: hive Updated Branches: refs/heads/llap bc8de94ae -> bf834079a
HIVE-13133: Create initial InputFormat + record readers/writers (Gunther Hagleitner) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bf834079 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bf834079 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bf834079 Branch: refs/heads/llap Commit: bf834079a3491bdcc65e1b839591f9db7098cf3b Parents: bc8de94 Author: Gunther Hagleitner <gunt...@apache.org> Authored: Tue Feb 23 17:45:10 2016 -0800 Committer: Gunther Hagleitner <gunt...@apache.org> Committed: Tue Feb 23 18:53:46 2016 -0800 ---------------------------------------------------------------------- bin/ext/llapdump.sh | 31 +++ bin/hive | 4 + .../org/apache/hadoop/hive/conf/HiveConf.java | 2 + .../test/resources/testconfiguration.properties | 1 + jdbc/pom.xml | 11 +- .../src/java/org/apache/hive/jdbc/LlapDump.java | 136 ++++++++++ .../org/apache/hive/jdbc/LlapInputFormat.java | 174 ++++++++++++ .../hadoop/hive/llap/LlapDataOutputBuffer.java | 165 ++++++++++++ .../hadoop/hive/llap/LlapInputFormat.java | 249 +++++++++++++++++ .../apache/hadoop/hive/llap/LlapInputSplit.java | 143 ++++++++++ .../hadoop/hive/llap/LlapOutputFormat.java | 60 +++++ .../hive/llap/LlapOutputFormatService.java | 141 ++++++++++ .../hadoop/hive/llap/LlapRecordReader.java | 86 ++++++ .../hadoop/hive/llap/LlapRecordWriter.java | 52 ++++ .../hadoop/hive/ql/exec/FunctionRegistry.java | 2 + .../hive/ql/exec/SerializationUtilities.java | 2 +- .../hive/ql/exec/tez/HiveSplitGenerator.java | 21 +- .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 2 +- .../hive/ql/optimizer/SimpleFetchOptimizer.java | 50 ++-- .../ql/udf/generic/GenericUDFGetSplits.java | 265 +++++++++++++++++++ .../hadoop/hive/llap/TestLlapOutputFormat.java | 124 +++++++++ .../queries/clientpositive/udf_get_splits.q | 6 + .../clientpositive/tez/udf_get_splits.q.out | 73 +++++ 23 files changed, 1758 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/bin/ext/llapdump.sh ---------------------------------------------------------------------- diff --git a/bin/ext/llapdump.sh b/bin/ext/llapdump.sh new file mode 100644 index 0000000..2564e82 --- /dev/null +++ b/bin/ext/llapdump.sh @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +THISSERVICE=llapdump +export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} " + +llapdump () { + CLASS=org.apache.hive.jdbc.LlapDump + HIVE_OPTS='' + execHiveCmd $CLASS "$@" +} + +llapdump_help () { + echo "usage ./hive llapdump [-l <url>] [-u <user>] [-p <pwd>] <query>" + echo "" + echo " --location (-l) hs2 url" + echo " --user (-u) user name" + echo " --pwd (-p) password" +} http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/bin/hive ---------------------------------------------------------------------- diff --git a/bin/hive b/bin/hive index e9477f7..e6693f6 100755 --- a/bin/hive +++ b/bin/hive @@ -48,6 +48,10 @@ while [ $# -gt 0 ]; do SERVICE=orcfiledump shift ;; + --llapdump) + SERVICE=llapdump + shift + ;; --help) HELP=_help shift http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 9cb626e..7fbcbba 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2684,6 +2684,8 @@ public class HiveConf extends Configuration { false, "Whether to setup split locations to match nodes on which llap daemons are running," + " instead of using the locations provided by the split itself"), + LLAP_DAEMON_OUTPUT_SERVICE_PORT("hive.llap.daemon.output.service.port", 15003, + "LLAP daemon output service port"), SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", "60s", new TimeValidator(TimeUnit.SECONDS), http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 58d0a45..deb9905 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -435,6 +435,7 @@ minitez.query.files=bucket_map_join_tez1.q,\ tez_smb_main.q,\ tez_smb_1.q,\ tez_smb_empty.q,\ + udf_get_splits.q,\ vector_join_part_col_char.q,\ vectorized_dynamic_partition_pruning.q,\ tez_multi_union.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/jdbc/pom.xml b/jdbc/pom.xml index f87ab59..2be8c30 100644 --- a/jdbc/pom.xml +++ b/jdbc/pom.xml @@ -42,14 +42,13 @@ </dependency> <dependency> <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> <artifactId>hive-service</artifactId> <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-exec</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>org.apache.hive</groupId> http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java b/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java new file mode 100644 index 0000000..b0c0253 --- /dev/null +++ b/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import java.io.OutputStream; +import java.io.InputStream; +import java.io.File; +import java.io.IOException; +import java.io.FileInputStream; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.RCFile.Reader; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.llap.io.api.LlapProxy; +import org.apache.hadoop.hive.llap.LlapRecordReader; +import org.apache.hadoop.hive.metastore.api.Schema; + +public class LlapDump { + + private static final Logger LOG = LoggerFactory.getLogger(LlapDump.class); + + private static String url = "jdbc:hive2://localhost:10000/default"; + private static String user = "hive"; + private static String pwd = ""; + private static String query = "select * from test"; + + public static void main(String[] args) throws Exception { + Options opts = createOptions(); + CommandLine cli = new GnuParser().parse(opts, args); + + if (cli.hasOption('h')) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("orcfiledump", opts); + return; + } + + if (cli.hasOption('l')) { + url = cli.getOptionValue("l"); + } + + if (cli.hasOption('u')) { + user = cli.getOptionValue("u"); + } + + if (cli.hasOption('p')) { + pwd = cli.getOptionValue("p"); + } + + if (cli.getArgs().length > 0) { + query = cli.getArgs()[0]; + } + + System.out.println("url: "+url); + System.out.println("user: "+user); + System.out.println("query: "+query); + + LlapInputFormat format = new LlapInputFormat(url, user, pwd, query); + JobConf job = new JobConf(); + InputSplit[] splits = format.getSplits(job, 1); + RecordReader<NullWritable, Text> reader = format.getRecordReader(splits[0], job, null); + + if (reader instanceof LlapRecordReader) { + Schema schema = ((LlapRecordReader)reader).getSchema(); + System.out.println(""+schema); + } + System.out.println("Results: "); + System.out.println(""); + + Text value = reader.createValue(); + while (reader.next(NullWritable.get(), value)) { + System.out.println(value); + } + } + + static Options createOptions() { + Options result = new Options(); + + result.addOption(OptionBuilder + .withLongOpt("location") + .withDescription("HS2 url") + .hasArg() + .create('l')); + + result.addOption(OptionBuilder + .withLongOpt("user") + .withDescription("user name") + .hasArg() + .create('u')); + + result.addOption(OptionBuilder + .withLongOpt("pwd") + .withDescription("password") + .hasArg() + .create('p')); + + return result; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java new file mode 100644 index 0000000..97fe2c5 --- /dev/null +++ b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.jdbc; + +import java.util.ArrayList; +import java.util.List; + +import java.sql.SQLException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.sql.DriverManager; + +import java.io.IOException; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.DataInputStream; +import java.io.ByteArrayInputStream; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.InputSplitWithLocationInfo; +import org.apache.hadoop.mapred.SplitLocationInfo; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.Progressable; + +import com.google.common.base.Preconditions; + +public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> { + + private static String driverName = "org.apache.hive.jdbc.HiveDriver"; + private String url; // "jdbc:hive2://localhost:10000/default" + private String user; // "hive", + private String pwd; // "" + private String query; + + private Connection con; + private Statement stmt; + + public LlapInputFormat(String url, String user, String pwd, String query) { + this.url = url; + this.user = user; + this.pwd = pwd; + this.query = query; + } + + public LlapInputFormat() {} + + public class LlapInputSplit implements InputSplitWithLocationInfo { + InputSplitWithLocationInfo nativeSplit; + String inputFormatClassName; + + @Override + public long getLength() throws IOException { + return nativeSplit.getLength(); + } + + @Override + public String[] getLocations() throws IOException { + return nativeSplit.getLocations(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(inputFormatClassName); + out.writeUTF(nativeSplit.getClass().toString()); + nativeSplit.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + inputFormatClassName = in.readUTF(); + String splitClass = in.readUTF(); + try { + nativeSplit = (InputSplitWithLocationInfo)Class.forName(splitClass).newInstance(); + } catch (Exception e) { + throw new IOException(e); + } + nativeSplit.readFields(in); + } + + @Override + public SplitLocationInfo[] getLocationInfo() throws IOException { + return nativeSplit.getLocationInfo(); + } + + public InputSplit getSplit() { + return nativeSplit; + } + + public InputFormat<NullWritable, V> getInputFormat() { + try { + return (InputFormat<NullWritable, V>) Class.forName(inputFormatClassName) + .newInstance(); + } catch(Exception e) { + return null; + } + } + } + + @Override + public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { + LlapInputSplit llapSplit = (LlapInputSplit)split; + return llapSplit.getInputFormat().getRecordReader(llapSplit.getSplit(), job, reporter); + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + List<InputSplit> ins = new ArrayList<InputSplit>(); + + if (url == null || query == null) { + throw new IllegalStateException(); + } + + try { + Class.forName(driverName); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + + try { + con = DriverManager.getConnection(url,user,pwd); + stmt = con.createStatement(); + String sql = "select r.if_class as ic, r.split_class as sc, r.split as s from (select explode(get_splits(\""+query+"\","+numSplits+")) as r) t"; + ResultSet res = stmt.executeQuery(sql); + while (res.next()) { + // deserialize split + DataInput in = new DataInputStream(new ByteArrayInputStream(res.getBytes(3))); + InputSplit is = (InputSplitWithLocationInfo)Class.forName(res.getString(2)).newInstance(); // todo setAccessible on ctor + is.readFields(in); + ins.add(is); + } + + res.close(); + stmt.close(); + } catch (Exception e) { + throw new IOException(e); + } + return ins.toArray(new InputSplit[ins.size()]); // todo wrap input split with format + } + + public void close() { + try { + con.close(); + } catch (Exception e) { + // ignore + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/llap/LlapDataOutputBuffer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapDataOutputBuffer.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapDataOutputBuffer.java new file mode 100644 index 0000000..aad8968 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapDataOutputBuffer.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.llap; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; +/** + * A thread-not-safe version of Hadoop's DataOutputBuffer, which removes all + * synchronized modifiers. + */ +public class LlapDataOutputBuffer implements DataOutput { + + int readOffset; + int writeOffset; + byte[] buffer; + + /** Constructs a new empty buffer. */ + public LlapDataOutputBuffer(int length) { + buffer = new byte[length]; + reset(); + } + + /** + * Returns the current contents of the buffer. Data is only valid to + * {@link #getLength()}. + */ + public byte[] getData() { + return buffer; + } + + /** Returns the length of the valid data currently in the buffer. */ + public int getLength() { + return (writeOffset - readOffset) % buffer.length; + } + + /** Resets the buffer to empty. */ + public LlapDataOutputBuffer reset() { + readOffset = 0; + writeOffset = 0; + return this; + } + + /** Writes bytes from a DataInput directly into the buffer. */ + public void write(DataInput in, int length) throws IOException { + // + } + + @Override + public synchronized void write(int b) throws IOException { + while (readOffset == writeOffset) { + try { + wait(); + } catch(InterruptedException e) { + } + } + buffer[writeOffset] = (byte)b; + writeOffset = (writeOffset + 1) % buffer.length; + notify(); + } + + public synchronized int read() throws IOException { + while (readOffset == writeOffset) { + try { + wait(); + } catch(InterruptedException e) { + } + } + int b = buffer[readOffset]; + readOffset = (readOffset + 1) % buffer.length; + notify(); + return b; + } + + @Override + public void write(byte b[], int off, int len) throws IOException { + while(len-- != 0) { + write(b[off++]); + } + } + + @Override + public void write(byte b[]) throws IOException { + write(b, 0, b.length); + } + + + @Override + public void writeBoolean(boolean v) throws IOException { + write(v?1:0); + } + + @Override + public void writeByte(int v) throws IOException { + write(v); + } + + @Override + public void writeChar(int v) throws IOException { + write(v); + } + + @Override + public void writeBytes(String v) throws IOException { + write(v.getBytes(), 0, v.length()); + } + + @Override + public void writeChars(String v) throws IOException { + write(v.getBytes(), 0, v.length()); + } + + @Override + public void writeDouble(double v) throws IOException { + write(ByteBuffer.allocate(8).putDouble(v).array(),0,8); + } + + @Override + public void writeFloat(float v) throws IOException { + write(ByteBuffer.allocate(4).putFloat(v).array(),0,4); + } + + @Override + public void writeInt(int v) throws IOException { + write(v); + write(v>>>8); + write(v>>>16); + write(v>>>24); + } + + @Override + public void writeLong(long v) throws IOException { + int v1 = (int)v; + int v2 = (int)v>>>32; + write(v1); + write(v2); + } + + @Override + public void writeShort(int v) throws IOException { + write(v); + write(v>>>8); + } + + @Override + public void writeUTF(String v) throws IOException { + write(v.getBytes(), 0, v.length()); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java new file mode 100644 index 0000000..4db4d32 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import javax.security.auth.login.LoginException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.Socket; + +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; + +import com.esotericsoftware.kryo.Kryo; +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.io.InputStream; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.io.FileNotFoundException; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.FilenameUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.SplitLocationInfo; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.runtime.api.Event; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.tez.DagUtils; +import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.tez.dag.api.TaskLocationHint; +import org.apache.tez.dag.api.VertexLocationHint; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem; +import org.apache.tez.mapreduce.hadoop.MRInputHelpers; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.InputInitializer; +import org.apache.tez.runtime.api.InputInitializerContext; +import org.apache.tez.runtime.api.InputSpecUpdate; +import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent; +import org.apache.tez.runtime.api.events.InputDataInformationEvent; +import org.apache.tez.runtime.api.events.InputInitializerEvent; + + +import com.google.common.base.Preconditions; + +public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> { + + private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class); + + private TezWork work; + private Schema schema; + + public LlapInputFormat(TezWork tezWork, Schema schema) { + this.work = tezWork; + this.schema = schema; + } + + // need empty constructor for bean instantiation + public LlapInputFormat() {} + + /* + * This proxy record reader has the duty of establishing a connected socket with LLAP, then fire + * off the work in the split to LLAP and finally return the connected socket back in an + * LlapRecordReader. The LlapRecordReader class reads the results from the socket. + */ + public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { + + LlapInputSplit llapSplit = (LlapInputSplit)split; + + // TODO: push event into LLAP + + // this is just the portion that sets up the io to receive data + String host = split.getLocations()[0]; + String id = job.get(LlapOutputFormat.LLAP_OF_ID_KEY); + + HiveConf conf = new HiveConf(); + Socket socket = new Socket(host, + conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT)); + + LOG.debug("Socket connected"); + + socket.getOutputStream().write(id.getBytes()); + socket.getOutputStream().write(0); + socket.getOutputStream().flush(); + + LOG.debug("Registered id: " + id); + + return new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class); + } + + /* + * getSplits() gets called as part of the GenericUDFGetSplits call to get splits. Here we create + * an array of input splits from the work item we have, figure out the location for llap and pass + * that back for the submission. getRecordReader method above uses that split info to assign the + * work to llap. + */ + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + // TODO: need to build proto of plan + + DAG dag = DAG.create(work.getName()); + dag.setCredentials(job.getCredentials()); + // TODO: set access control? TezTask.setAccessControlsForCurrentUser(dag); + + DagUtils utils = DagUtils.getInstance(); + Context ctx = new Context(job); + MapWork mapWork = (MapWork) work.getAllWork().get(0); + // bunch of things get setup in the context based on conf but we need only the MR tmp directory + // for the following method. + JobConf wxConf = utils.initializeVertexConf(job, ctx, mapWork); + Path scratchDir = utils.createTezDir(ctx.getMRScratchDir(), job); + FileSystem fs = scratchDir.getFileSystem(job); + try { + LocalResource appJarLr = createJarLocalResource(utils.getExecJarPathLocal(), utils, job); + Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr, + new ArrayList<LocalResource>(), fs, ctx, false, work, + work.getVertexType(mapWork)); + dag.addVertex(wx); + utils.addCredentials(mapWork, dag); + + // we have the dag now proceed to get the splits: + HiveSplitGenerator splitGenerator = new HiveSplitGenerator(null); + splitGenerator.initializeSplitGenerator(wxConf, mapWork); + List<Event> eventList = splitGenerator.initialize(); + + // hack - just serializing with kryo for now. This needs to be done properly + InputSplit[] result = new InputSplit[eventList.size()]; + int i = 0; + ByteArrayOutputStream bos = new ByteArrayOutputStream(10240); + + InputConfigureVertexTasksEvent configureEvent = (InputConfigureVertexTasksEvent) + eventList.remove(0); + + List<TaskLocationHint> hints = configureEvent.getLocationHint().getTaskLocationHints(); + for (Event event: eventList) { + TaskLocationHint hint = hints.remove(0); + Set<String> hosts = hint.getHosts(); + SplitLocationInfo[] locations = new SplitLocationInfo[hosts.size()]; + + int j = 0; + for (String host: hosts) { + locations[j++] = new SplitLocationInfo(host,false); + } + + bos.reset(); + Kryo kryo = SerializationUtilities.borrowKryo(); + SerializationUtilities.serializeObjectByKryo(kryo, event, bos); + SerializationUtilities.releaseKryo(kryo); + result[i++] = new LlapInputSplit(bos.toByteArray(), locations, schema); + } + return result; + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * Returns a local resource representing a jar. This resource will be used to execute the plan on + * the cluster. + * + * @param localJarPath + * Local path to the jar to be localized. + * @return LocalResource corresponding to the localized hive exec resource. + * @throws IOException + * when any file system related call fails. + * @throws LoginException + * when we are unable to determine the user. + * @throws URISyntaxException + * when current jar location cannot be determined. + */ + private LocalResource createJarLocalResource(String localJarPath, DagUtils utils, + Configuration conf) + throws IOException, LoginException, IllegalArgumentException, FileNotFoundException { + FileStatus destDirStatus = utils.getHiveJarDirectory(conf); + assert destDirStatus != null; + Path destDirPath = destDirStatus.getPath(); + + Path localFile = new Path(localJarPath); + String sha = getSha(localFile, conf); + + String destFileName = localFile.getName(); + + // Now, try to find the file based on SHA and name. Currently we require exact name match. + // We could also allow cutting off versions and other stuff provided that SHA matches... + destFileName = FilenameUtils.removeExtension(destFileName) + "-" + sha + + FilenameUtils.EXTENSION_SEPARATOR + FilenameUtils.getExtension(destFileName); + + // TODO: if this method is ever called on more than one jar, getting the dir and the + // list need to be refactored out to be done only once. + Path destFile = new Path(destDirPath.toString() + "/" + destFileName); + return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, conf); + } + + private String getSha(Path localFile, Configuration conf) + throws IOException, IllegalArgumentException { + InputStream is = null; + try { + FileSystem localFs = FileSystem.getLocal(conf); + is = localFs.open(localFile); + return DigestUtils.sha256Hex(is); + } finally { + if (is != null) { + is.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java new file mode 100644 index 0000000..78dbb34 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap; + +import java.io.IOException; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.DataInputStream; +import java.io.ByteArrayInputStream; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.hive.llap.io.api.LlapProxy; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.SplitLocationInfo; +import org.apache.hadoop.mapred.InputSplitWithLocationInfo; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.AutoExpandingBufferWriteTransport; +import org.apache.thrift.transport.AutoExpandingBuffer; + +import com.google.common.base.Preconditions; + +public class LlapInputSplit implements InputSplitWithLocationInfo { + + byte[] queryFragment; + SplitLocationInfo[] locations; + Schema schema; + + public LlapInputSplit() {} + + public LlapInputSplit(byte[] queryFragment, SplitLocationInfo[] locations, Schema schema) { + this.queryFragment = queryFragment; + this.locations = locations; + this.schema = schema; + } + + public Schema getSchema() { + return schema; + } + + @Override + public long getLength() throws IOException { + return 0; + } + + @Override + public String[] getLocations() throws IOException { + String[] locs = new String[locations.length]; + for (int i = 0; i < locations.length; ++i) { + locs[i] = locations[i].getLocation(); + } + return locs; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(queryFragment.length); + out.write(queryFragment); + + out.writeInt(locations.length); + for (int i = 0; i < locations.length; ++i) { + out.writeUTF(locations[i].getLocation()); + } + + byte[] binarySchema; + + try { + AutoExpandingBufferWriteTransport transport = new AutoExpandingBufferWriteTransport(1024, 2d); + TProtocol protocol = new TBinaryProtocol(transport); + schema.write(protocol); + binarySchema = transport.getBuf().array(); + } catch (Exception e) { + throw new IOException(e); + } + + out.writeInt(binarySchema.length); + out.write(binarySchema); + } + + @Override + public void readFields(DataInput in) throws IOException { + byte[] queryFragment; + + int length = in.readInt(); + queryFragment = new byte[length]; + in.readFully(queryFragment); + + length = in.readInt(); + locations = new SplitLocationInfo[length]; + + for (int i = 0; i < length; ++i) { + locations[i] = new SplitLocationInfo(in.readUTF(), false); + } + + length = in.readInt(); + + try { + AutoExpandingBufferWriteTransport transport = new AutoExpandingBufferWriteTransport(length, 2d); + AutoExpandingBuffer buf = transport.getBuf(); + in.readFully(buf.array(), 0, length); + + TProtocol protocol = new TBinaryProtocol(transport); + schema = new Schema(); + schema.read(protocol); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public SplitLocationInfo[] getLocationInfo() throws IOException { + return locations; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java new file mode 100644 index 0000000..8e98aba --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.hive.llap.io.api.LlapProxy; + +import com.google.common.base.Preconditions; + +/** + * + */ +public class LlapOutputFormat<K extends Writable, V extends Writable> + implements OutputFormat<K, V> { + + public static final String LLAP_OF_ID_KEY = "llap.of.id"; + + @Override + public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException { + } + + @Override + public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { + if (!LlapProxy.isDaemon()) { + throw new IOException("LlapOutputFormat can only be used inside Llap"); + } + try { + return LlapOutputFormatService.get().<K,V>getWriter(job.get(LLAP_OF_ID_KEY)); + } catch (InterruptedException e) { + throw new IOException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java new file mode 100644 index 0000000..4f38ff1 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap; + +import java.util.Map; +import java.util.HashMap; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.io.InputStream; +import java.io.OutputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.io.api.LlapProxy; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * + */ +public class LlapOutputFormatService { + + private static final Logger LOG = LoggerFactory.getLogger(LlapOutputFormat.class); + + private static LlapOutputFormatService service; + private final Map<String, RecordWriter> writers; + private final ServerSocket socket; + private final HiveConf conf; + private final ExecutorService executor; + private static final int WAIT_TIME = 5; + + private LlapOutputFormatService() throws IOException { + writers = new HashMap<String, RecordWriter>(); + conf = new HiveConf(); + executor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LLAP output %d").build()); + socket = new ServerSocket( + conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT)); + } + + public static LlapOutputFormatService get() throws IOException { + if (service == null) { + service = new LlapOutputFormatService(); + service.start(); + } + return service; + } + + public void start() throws IOException { + executor.submit(new Runnable() { + byte[] buffer = new byte[4096]; + @Override + public void run() { + while (true) { + Socket s = null; + try { + s = socket.accept(); + String id = readId(s); + LOG.debug("Received: "+id); + registerReader(s, id); + } catch (IOException io) { + if (s != null) { + try{ + s.close(); + } catch (IOException io2) { + // ignore + } + } + } + } + } + + private String readId(Socket s) throws IOException { + InputStream in = s.getInputStream(); + int idx = 0; + while((buffer[idx++] = (byte)in.read()) != '\0') {} + return new String(buffer,0,idx-1); + } + + private void registerReader(Socket s, String id) throws IOException { + synchronized(service) { + LOG.debug("registering socket for: "+id); + LlapRecordWriter writer = new LlapRecordWriter(s.getOutputStream()); + writers.put(id, writer); + service.notifyAll(); + } + } + } + ); + } + + public void stop() throws IOException, InterruptedException { + executor.shutdown(); + executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS); + socket.close(); + } + + public <K,V> RecordWriter<K, V> getWriter(String id) throws IOException, InterruptedException { + RecordWriter writer = null; + synchronized(service) { + while ((writer = writers.get(id)) == null) { + LOG.debug("Waiting for writer for: "+id); + service.wait(); + } + } + return writer; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java new file mode 100644 index 0000000..ce3d39a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import java.io.IOException; +import java.io.InputStream; +import java.io.DataInputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.RCFile.Reader; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.hive.metastore.api.Schema; + +public class LlapRecordReader<V extends WritableComparable> implements RecordReader<NullWritable, V> { + + DataInputStream din; + Schema schema; + Class<V> clazz; + + public LlapRecordReader(InputStream in, Schema schema, Class<V> clazz) { + din = new DataInputStream(in); + this.schema = schema; + this.clazz = clazz; + } + + public Schema getSchema() { + return schema; + } + + @Override + public void close() throws IOException { + din.close(); + } + + @Override + public long getPos() { return 0; } + + @Override + public float getProgress() { return 0f; } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public V createValue() { + try { + return clazz.newInstance(); + } catch (Exception e) { + return null; + } + } + + @Override + public boolean next(NullWritable key, V value) { + try { + value.readFields(din); + return true; + } catch (IOException io) { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java new file mode 100644 index 0000000..4d1996c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.DataOutputStream;; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.JobConf; + +public class LlapRecordWriter<K extends Writable, V extends WritableComparable> + implements RecordWriter<K,V> { + + DataOutputStream dos; + + public LlapRecordWriter(OutputStream out) { + dos = new DataOutputStream(out); + } + + @Override + public void close(Reporter reporter) throws IOException { + dos.close(); + } + + @Override + public void write(K key, V value) throws IOException { + value.write(dos); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java index e0e030f..f3afa24 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java @@ -344,6 +344,8 @@ public final class FunctionRegistry { system.registerGenericUDF("ewah_bitmap_or", GenericUDFEWAHBitmapOr.class); system.registerGenericUDF("ewah_bitmap_empty", GenericUDFEWAHBitmapEmpty.class); + system.registerGenericUDF("get_splits", GenericUDFGetSplits.class); + // Aliases for Java Class Names // These are used in getImplicitConvertUDFMethod system.registerUDF(serdeConstants.BOOLEAN_TYPE_NAME, UDFToBoolean.class, false, UDFToBoolean.class.getSimpleName()); http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java index b05a79e..eaa4293 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java @@ -570,7 +570,7 @@ public class SerializationUtilities { * @param plan Usually of type MapredWork, MapredLocalWork etc. * @param out stream in which serialized plan is written into */ - private static void serializeObjectByKryo(Kryo kryo, Object plan, OutputStream out) { + public static void serializeObjectByKryo(Kryo kryo, Object plan, OutputStream out) { Output output = new Output(out); kryo.setClassLoader(Utilities.getSessionSpecifiedClassLoader()); kryo.writeObject(output, plan); http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index 8e48c2e..b0cda82 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -74,17 +74,26 @@ public class HiveSplitGenerator extends InputInitializer { private static final Logger LOG = LoggerFactory.getLogger(HiveSplitGenerator.class); - private final DynamicPartitionPruner pruner; - private final Configuration conf; - private final JobConf jobConf; - private final MRInputUserPayloadProto userPayloadProto; - private final MapWork work; + private DynamicPartitionPruner pruner = null; + private Configuration conf = null; + private JobConf jobConf = null; + private MRInputUserPayloadProto userPayloadProto = null; + private MapWork work = null; private final SplitGrouper splitGrouper = new SplitGrouper(); - private final SplitLocationProvider splitLocationProvider; + private SplitLocationProvider splitLocationProvider = null; + + public void initializeSplitGenerator(Configuration conf, MapWork work) { + this.conf = conf; + this.work = work; + this.jobConf = new JobConf(conf); + } public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOException, SerDeException { super(initializerContext); + if (initializerContext == null) { + return; + } Preconditions.checkNotNull(initializerContext); userPayloadProto = MRInputHelpers.parseMRInputPayload(initializerContext.getInputUserPayload()); http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 83defea..9e688ea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -412,7 +412,7 @@ public class TezTask extends Task<TezWork> { return dag; } - private void setAccessControlsForCurrentUser(DAG dag) { + public static void setAccessControlsForCurrentUser(DAG dag) { // get current user String currentUser = SessionState.getUserFromAuthenticator(); if(LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java index b5ceb14..ca8dccf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.LimitOperator; +import org.apache.hadoop.hive.ql.exec.UDTFOperator; import org.apache.hadoop.hive.ql.exec.ListSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; @@ -140,35 +141,31 @@ public class SimpleFetchOptimizer extends Transform { } private boolean checkThreshold(FetchData data, int limit, ParseContext pctx) throws Exception { + boolean result = false; + if (limit > 0) { if (data.hasOnlyPruningFilter()) { /* partitioned table + query has only pruning filters */ - return true; + result = true; } else if (data.isPartitioned() == false && data.isFiltered() == false) { /* unpartitioned table + no filters */ - return true; + result = true; } /* fall through */ - } - long threshold = HiveConf.getLongVar(pctx.getConf(), - HiveConf.ConfVars.HIVEFETCHTASKCONVERSIONTHRESHOLD); - if (threshold < 0) { - return true; - } - Operator child = data.scanOp.getChildOperators().get(0); - if(child instanceof SelectOperator) { - // select *, constant and casts can be allowed without a threshold check - if (checkExpressions((SelectOperator)child)) { - return true; + } else { + long threshold = HiveConf.getLongVar(pctx.getConf(), + HiveConf.ConfVars.HIVEFETCHTASKCONVERSIONTHRESHOLD); + if (threshold < 0) { + result = true; + } else { + long remaining = threshold; + remaining -= data.getInputLength(pctx, remaining); + if (remaining >= 0) { + result = true; + } } } - long remaining = threshold; - remaining -= data.getInputLength(pctx, remaining); - if (remaining < 0) { - LOG.info("Threshold " + remaining + " exceeded for pseudoMR mode"); - return false; - } - return true; + return result; } // all we can handle is LimitOperator, FilterOperator SelectOperator and final FS @@ -187,23 +184,20 @@ public class SimpleFetchOptimizer extends Transform { return null; } Table table = ts.getConf().getTableMetadata(); - if (table == null) { - return null; - } ReadEntity parent = PlanUtils.getParentViewInfo(alias, pctx.getViewAliasToInput()); - if (!table.isPartitioned()) { + if (table != null && !table.isPartitioned()) { FetchData fetch = new FetchData(ts, parent, table, splitSample); return checkOperators(fetch, aggressive, false); } boolean bypassFilter = false; - if (HiveConf.getBoolVar(pctx.getConf(), HiveConf.ConfVars.HIVEOPTPPD)) { + if (table != null && HiveConf.getBoolVar(pctx.getConf(), HiveConf.ConfVars.HIVEOPTPPD)) { ExprNodeDesc pruner = pctx.getOpToPartPruner().get(ts); if (PartitionPruner.onlyContainsPartnCols(table, pruner)) { bypassFilter = !pctx.getPrunedPartitions(alias, ts).hasUnknownPartitions(); } } - if (!aggressive && !bypassFilter) { + if (table != null && !aggressive && !bypassFilter) { return null; } PrunedPartitionList partitions = pctx.getPrunedPartitions(alias, ts); @@ -231,7 +225,7 @@ public class SimpleFetchOptimizer extends Transform { continue; } - if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter))) { + if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter)) || op instanceof UDTFOperator) { break; } @@ -289,7 +283,7 @@ public class SimpleFetchOptimizer extends Transform { private boolean isConvertible(FetchData fetch, Operator<?> operator, Set<Operator<?>> traversed) { if (operator instanceof ReduceSinkOperator || operator instanceof CommonJoinOperator - || operator instanceof ScriptOperator || operator instanceof UDTFOperator) { + || operator instanceof ScriptOperator) { return false; } http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java new file mode 100644 index 0000000..3b7dcd9 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf.generic; + +import java.util.Arrays; +import java.util.List; +import java.util.ArrayList; +import java.util.UUID; +import java.io.Serializable; +import java.io.IOException; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.DataOutput; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.MapredContext; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.llap.LlapInputFormat; +import org.apache.hadoop.hive.llap.LlapOutputFormat; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.metastore.api.Schema; + + +/** + * GenericUDFGetSplits. + * + */ +@Description(name = "get_splits", value = "_FUNC_(string,int) - " + + "Returns an array of length int serialized splits for the referenced tables string.") +@UDFType(deterministic = false) +public class GenericUDFGetSplits extends GenericUDF { + + private static final Logger LOG = LoggerFactory.getLogger(GenericUDFGetSplits.class); + + private transient StringObjectInspector stringOI; + private transient IntObjectInspector intOI; + private final ArrayList<Object> retArray = new ArrayList<Object>(); + private transient JobConf jc; + private transient Hive db; + private ByteArrayOutputStream bos; + private DataOutput dos; + + @Override + public ObjectInspector initialize(ObjectInspector[] arguments) + throws UDFArgumentException { + + LOG.debug("initializing GenericUDFGetSplits"); + + try { + if (SessionState.get() != null && SessionState.get().getConf() != null) { + HiveConf conf = SessionState.get().getConf(); + jc = new JobConf(conf); + db = Hive.get(conf); + } else { + jc = MapredContext.get().getJobConf(); + db = Hive.get(); + } + } catch(HiveException e) { + LOG.error("Failed to initialize: ",e); + throw new UDFArgumentException(e); + } + + LOG.debug("Initialized conf, jc and metastore connection"); + + if (arguments.length != 2) { + throw new UDFArgumentLengthException("The function GET_SPLITS accepts 2 arguments."); + } else if (!(arguments[0] instanceof StringObjectInspector)) { + LOG.error("Got "+arguments[0].getTypeName()+" instead of string."); + throw new UDFArgumentTypeException(0, "\"" + + "string\" is expected at function GET_SPLITS, " + "but \"" + + arguments[0].getTypeName() + "\" is found"); + } else if (!(arguments[1] instanceof IntObjectInspector)) { + LOG.error("Got "+arguments[1].getTypeName()+" instead of int."); + throw new UDFArgumentTypeException(1, "\"" + + "int\" is expected at function GET_SPLITS, " + "but \"" + + arguments[1].getTypeName() + "\" is found"); + } + + stringOI = (StringObjectInspector) arguments[0]; + intOI = (IntObjectInspector) arguments[1]; + + List<String> names = Arrays.asList("if_class","split_class","split"); + List<ObjectInspector> fieldOIs = Arrays.<ObjectInspector>asList( + PrimitiveObjectInspectorFactory.javaStringObjectInspector, + PrimitiveObjectInspectorFactory.javaStringObjectInspector, + PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector); + ObjectInspector outputOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs); + ObjectInspector listOI = ObjectInspectorFactory.getStandardListObjectInspector(outputOI); + bos = new ByteArrayOutputStream(1024); + dos = new DataOutputStream(bos); + + LOG.debug("done initializing GenericUDFGetSplits"); + return listOI; + } + + @Override + public Object evaluate(DeferredObject[] arguments) throws HiveException { + retArray.clear(); + + String query = stringOI.getPrimitiveJavaObject(arguments[0].get()); + + int num = intOI.get(arguments[1].get()); + + Driver driver = new Driver(); + CommandProcessorResponse cpr; + + HiveConf conf = SessionState.get().getConf(); + + if (conf == null) { + throw new HiveException("Need configuration"); + } + + LOG.info("setting fetch.task.conversion to none and query file format to \""+LlapOutputFormat.class.toString()+"\""); + HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, "none"); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, LlapOutputFormat.class.toString()); + + cpr = driver.compileAndRespond(query); + if(cpr.getResponseCode() != 0) { + throw new HiveException("Failed to compile query: "+cpr.getException()); + } + + QueryPlan plan = driver.getPlan(); + List<Task<?>> roots = plan.getRootTasks(); + Schema schema = plan.getResultSchema(); + + if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) { + throw new HiveException("Was expecting a single TezTask."); + } + + Path data = null; + InputFormat inp = null; + String ifc = null; + + TezWork tezWork = ((TezTask)roots.get(0)).getWork(); + + if (tezWork.getAllWork().size() != 1) { + + String tableName = "table_"+UUID.randomUUID().toString().replaceAll("[^A-Za-z0-9 ]", ""); + + String ctas = "create temporary table "+tableName+" as "+query; + LOG.info("CTAS: "+ctas); + + try { + cpr = driver.run(ctas, false); + } catch(CommandNeedRetryException e) { + throw new HiveException(e); + } + + if(cpr.getResponseCode() != 0) { + throw new HiveException("Failed to create temp table: " + cpr.getException()); + } + + query = "select * from " + tableName; + cpr = driver.compileAndRespond(query); + if(cpr.getResponseCode() != 0) { + throw new HiveException("Failed to create temp table: "+cpr.getException()); + } + + plan = driver.getPlan(); + roots = plan.getRootTasks(); + schema = plan.getResultSchema(); + + if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) { + throw new HiveException("Was expecting a single TezTask."); + } + + tezWork = ((TezTask)roots.get(0)).getWork(); + + // Table table = db.getTable(tableName); + // if (table.isPartitioned()) { + // throw new UDFArgumentException("Table " + tableName + " is partitioned."); + // } + // data = table.getDataLocation(); + // LOG.info("looking at: "+data); + + // ifc = table.getInputFormatClass().toString(); + + // inp = ReflectionUtils.newInstance(table.getInputFormatClass(), jc); + } + + MapWork w = (MapWork)tezWork.getAllWork().get(0); + inp = new LlapInputFormat(tezWork, schema); + ifc = LlapInputFormat.class.toString(); + + try { + if (inp instanceof JobConfigurable) { + ((JobConfigurable) inp).configure(jc); + } + + if (inp instanceof FileInputFormat) { + ((FileInputFormat) inp).addInputPath(jc, data); + } + + for (InputSplit s: inp.getSplits(jc, num)) { + Object[] os = new Object[3]; + os[0] = ifc; + os[1] = s.getClass().toString(); + bos.reset(); + s.write(dos); + byte[] frozen = bos.toByteArray(); + os[2] = frozen; + retArray.add(os); + } + } catch(Exception e) { + throw new HiveException(e); + } + + return retArray; + } + + @Override + public String getDisplayString(String[] children) { + assert children.length == 2; + return getStandardDisplayString("get_splits", children); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java new file mode 100644 index 0000000..c49231c --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.Before; +import org.junit.After; + +import java.net.Socket; + +import java.io.OutputStream; +import java.io.InputStream; +import java.io.File; +import java.io.IOException; +import java.io.FileInputStream; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +import junit.framework.TestCase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.RCFile.Reader; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.io.api.LlapProxy; + + +public class TestLlapOutputFormat { + + private static final Logger LOG = LoggerFactory.getLogger(TestLlapOutputFormat.class); + + private LlapOutputFormatService service; + + @Before + public void setUp() throws IOException { + LOG.debug("Setting up output service"); + service = LlapOutputFormatService.get(); + LlapProxy.setDaemon(true); + LOG.debug("Output service up"); + } + + @After + public void tearDown() throws IOException, InterruptedException { + LOG.debug("Tearing down service"); + service.stop(); + LOG.debug("Tearing down complete"); + } + + @Test + public void testValues() throws Exception { + JobConf job = new JobConf(); + job.set(LlapOutputFormat.LLAP_OF_ID_KEY, "foobar"); + LlapOutputFormat format = new LlapOutputFormat(); + + HiveConf conf = new HiveConf(); + Socket socket = new Socket("localhost", + conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT)); + + LOG.debug("Socket connected"); + + socket.getOutputStream().write("foobar".getBytes()); + socket.getOutputStream().write(0); + socket.getOutputStream().flush(); + + Thread.sleep(3000); + + LOG.debug("Data written"); + + RecordWriter<NullWritable, Text> writer = format.getRecordWriter(null, job, null, null); + Text text = new Text(); + + LOG.debug("Have record writer"); + + for (int i = 0; i < 10; ++i) { + text.set(""+i); + writer.write(NullWritable.get(),text); + } + + writer.close(null); + + InputStream in = socket.getInputStream(); + RecordReader reader = new LlapRecordReader(in, null, Text.class); + + LOG.debug("Have record reader"); + + int count = 0; + while(reader.next(NullWritable.get(), text)) { + LOG.debug(text.toString()); + count++; + } + + Assert.assertEquals(count,10); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/test/queries/clientpositive/udf_get_splits.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/udf_get_splits.q b/ql/src/test/queries/clientpositive/udf_get_splits.q new file mode 100644 index 0000000..70400e8 --- /dev/null +++ b/ql/src/test/queries/clientpositive/udf_get_splits.q @@ -0,0 +1,6 @@ +set hive.fetch.task.conversion=more; + +DESCRIBE FUNCTION get_splits; +DESCRIBE FUNCTION EXTENDED get_splits; + +select r.if_class as ic, r.split_class as sc, hash(r.split) as h, length(r.split) as l from (select explode(get_splits("select key, count(*) from src where key % 2 = 0 group by key",5)) as r) t; http://git-wip-us.apache.org/repos/asf/hive/blob/bf834079/ql/src/test/results/clientpositive/tez/udf_get_splits.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/tez/udf_get_splits.q.out b/ql/src/test/results/clientpositive/tez/udf_get_splits.q.out new file mode 100644 index 0000000..c8ebe88 --- /dev/null +++ b/ql/src/test/results/clientpositive/tez/udf_get_splits.q.out @@ -0,0 +1,73 @@ +PREHOOK: query: DESCRIBE FUNCTION get_splits +PREHOOK: type: DESCFUNCTION +POSTHOOK: query: DESCRIBE FUNCTION get_splits +POSTHOOK: type: DESCFUNCTION +get_splits(string,int) - Returns an array of length int serialized splits for the referenced tables string. +PREHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits +PREHOOK: type: DESCFUNCTION +POSTHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits +POSTHOOK: type: DESCFUNCTION +get_splits(string,int) - Returns an array of length int serialized splits for the referenced tables string. +PREHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: database:default +PREHOOK: Output: default@table_74b4da448d74412d8fc0da37a405efb3 +POSTHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: database:default +POSTHOOK: Output: default@table_74b4da448d74412d8fc0da37a405efb3 +PREHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: database:default +PREHOOK: Output: default@table_4c8d9e53ea514617a6a72158c9c843c5 +POSTHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: database:default +POSTHOOK: Output: default@table_4c8d9e53ea514617a6a72158c9c843c5 +PREHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: database:default +PREHOOK: Output: default@table_42dcc08d004e411f8038701980b491e3 +POSTHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: database:default +POSTHOOK: Output: default@table_42dcc08d004e411f8038701980b491e3 +PREHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: database:default +PREHOOK: Output: default@table_9ef6460ac4a24e4fab1ea09ad94b01e3 +POSTHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: database:default +POSTHOOK: Output: default@table_9ef6460ac4a24e4fab1ea09ad94b01e3 +PREHOOK: query: select r.if_class as ic, r.split_class as sc, hash(r.split) as h, length(r.split) as l from (select explode(get_splits("select key, count(*) from src where key % 2 = 0 group by key",5)) as r) t +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: database:default +PREHOOK: Output: default@table_bf88922034334ab08a9abb6cf8aa546e +POSTHOOK: query: select r.if_class as ic, r.split_class as sc, hash(r.split) as h, length(r.split) as l from (select explode(get_splits("select key, count(*) from src where key % 2 = 0 group by key",5)) as r) t +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: database:default +POSTHOOK: Output: default@table_bf88922034334ab08a9abb6cf8aa546e +PREHOOK: query: select r.if_class as ic, r.split_class as sc, hash(r.split) as h, length(r.split) as l from (select explode(get_splits("select key, count(*) from src where key % 2 = 0 group by key",5)) as r) t +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +#### A masked pattern was here #### +POSTHOOK: query: select r.if_class as ic, r.split_class as sc, hash(r.split) as h, length(r.split) as l from (select explode(get_splits("select key, count(*) from src where key % 2 = 0 group by key",5)) as r) t +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +#### A masked pattern was here #### +class org.apache.hadoop.mapred.TextInputFormat class org.apache.hadoop.mapred.FileSplit -1434872849 218 +class org.apache.hadoop.mapred.TextInputFormat class org.apache.hadoop.mapred.FileSplit 2107621793 218 +class org.apache.hadoop.mapred.TextInputFormat class org.apache.hadoop.mapred.FileSplit -1988206222 218 +class org.apache.hadoop.mapred.TextInputFormat class org.apache.hadoop.mapred.FileSplit 1357774915 218 +class org.apache.hadoop.mapred.TextInputFormat class org.apache.hadoop.mapred.FileSplit 605302265 218