HIVE-13519: Allow LlapRecordReader to parse/output rows
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fc7343dd Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fc7343dd Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fc7343dd Branch: refs/heads/master Commit: fc7343dd12ac152267615e6ac67238ee06326452 Parents: 8f6b28a Author: Jason Dere <jd...@hortonworks.com> Authored: Thu Apr 14 14:30:45 2016 -0700 Committer: Jason Dere <jd...@hortonworks.com> Committed: Thu Apr 14 14:30:45 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/jdbc/TestLlapInputSplit.java | 21 +- .../apache/hive/jdbc/TestJdbcWithMiniLlap.java | 38 ++-- jdbc/pom.xml | 5 + .../apache/hive/jdbc/LlapBaseInputFormat.java | 135 ++++++++++++ .../src/java/org/apache/hive/jdbc/LlapDump.java | 11 +- .../org/apache/hive/jdbc/LlapInputFormat.java | 135 ------------ .../apache/hive/jdbc/LlapRowInputFormat.java | 34 +++ llap-common/pom.xml | 21 ++ .../org/apache/hadoop/hive/llap/FieldDesc.java | 63 ++++++ .../hadoop/hive/llap/LlapRowRecordReader.java | 155 ++++++++++++++ .../java/org/apache/hadoop/hive/llap/Row.java | 166 +++++++++++++++ .../org/apache/hadoop/hive/llap/Schema.java | 76 +++++++ .../org/apache/hadoop/hive/llap/TypeDesc.java | 108 ++++++++++ .../org/apache/hadoop/hive/llap/TestRow.java | 92 +++++++++ .../hadoop/hive/llap/LlapInputFormat.java | 27 ++- .../hadoop/hive/llap/LlapBaseRecordReader.java | 205 ++++++++++++++++++ .../apache/hadoop/hive/llap/LlapInputSplit.java | 27 +-- .../hadoop/hive/llap/LlapRecordReader.java | 206 ------------------- .../ql/udf/generic/GenericUDTFGetSplits.java | 87 +++++++- .../hadoop/hive/llap/TestLlapOutputFormat.java | 6 +- 20 files changed, 1205 insertions(+), 413 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java index 338930e..366e326 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java @@ -10,8 +10,10 @@ import java.util.HashMap; import org.apache.hadoop.io.Text; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.llap.Schema; +import org.apache.hadoop.hive.llap.FieldDesc; +import org.apache.hadoop.hive.llap.TypeDesc; + import org.apache.hadoop.mapred.SplitLocationInfo; import org.junit.After; import org.junit.Before; @@ -32,14 +34,11 @@ public class TestLlapInputSplit { new SplitLocationInfo("location1", false), new SplitLocationInfo("location2", false), }; - ArrayList<FieldSchema> fields = new ArrayList<FieldSchema>(); - fields.add(new FieldSchema("col1", "string", "comment1")); - fields.add(new FieldSchema("col2", "int", "comment2")); - HashMap<String, String> properties = new HashMap<String, String>(); - properties.put("key1", "val1"); - Schema schema = new Schema( - fields, - properties); + + ArrayList<FieldDesc> colDescs = new ArrayList<FieldDesc>(); + colDescs.add(new FieldDesc("col1", new TypeDesc(TypeDesc.Type.STRING))); + colDescs.add(new FieldDesc("col2", new TypeDesc(TypeDesc.Type.INT))); + Schema schema = new Schema(colDescs); org.apache.hadoop.hive.llap.LlapInputSplit split1 = new org.apache.hadoop.hive.llap.LlapInputSplit( splitNum, @@ -94,7 +93,7 @@ public class TestLlapInputSplit { assertEquals(locationInfo1[idx].isOnDisk(), locationInfo2[idx].isOnDisk()); } assertArrayEquals(split1.getLocations(), split2.getLocations()); - assertEquals(split1.getSchema(), split2.getSchema()); + assertEquals(split1.getSchema().toString(), split2.getSchema().toString()); assertEquals(split1.getLlapUser(), split2.getLlapUser()); } http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java index 98daab4..deeac2e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java @@ -60,16 +60,16 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.LlapRecordReader; -import org.apache.hadoop.hive.metastore.ObjectStore; -import org.apache.hadoop.hive.metastore.api.Schema; -import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.llap.LlapRowRecordReader; +import org.apache.hadoop.hive.llap.Row; +import org.apache.hadoop.hive.llap.Schema; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hive.jdbc.miniHS2.MiniHS2; import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; -import org.apache.hive.jdbc.LlapInputFormat; +import org.apache.hive.jdbc.LlapBaseInputFormat; +import org.apache.hive.jdbc.LlapRowInputFormat; import org.datanucleus.ClassLoaderResolver; import org.datanucleus.NucleusContext; @@ -109,8 +109,6 @@ public class TestJdbcWithMiniLlap { conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml")); - conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() - + "/llap-daemon-site.xml")); miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); @@ -202,10 +200,14 @@ public class TestJdbcWithMiniLlap { String user = System.getProperty("user.name"); String pwd = user; - LlapInputFormat inputFormat = new LlapInputFormat(url, user, pwd, query); + LlapRowInputFormat inputFormat = new LlapRowInputFormat(); // Get splits JobConf job = new JobConf(conf); + job.set(LlapBaseInputFormat.URL_KEY, url); + job.set(LlapBaseInputFormat.USER_KEY, user); + job.set(LlapBaseInputFormat.PWD_KEY, pwd); + job.set(LlapBaseInputFormat.QUERY_KEY, query); InputSplit[] splits = inputFormat.getSplits(job, numSplits); assertTrue(splits.length > 0); @@ -216,10 +218,12 @@ public class TestJdbcWithMiniLlap { for (InputSplit split : splits) { System.out.println("Processing split " + split.getLocations()); - RecordReader<NullWritable, Text> reader = inputFormat.getRecordReader(split, job, null); - if (reader instanceof LlapRecordReader && first) { - Schema schema = ((LlapRecordReader)reader).getSchema(); + int numColumns = 2; + RecordReader<NullWritable, Row> reader = inputFormat.getRecordReader(split, job, null); + if (reader instanceof LlapRowRecordReader && first) { + Schema schema = ((LlapRowRecordReader) reader).getSchema(); System.out.println(""+schema); + assertEquals(numColumns, schema.getColumns().size()); } if (first) { @@ -228,9 +232,15 @@ public class TestJdbcWithMiniLlap { first = false; } - Text value = reader.createValue(); - while (reader.next(NullWritable.get(), value)) { - System.out.println(value); + Row row = reader.createValue(); + while (reader.next(NullWritable.get(), row)) { + for (int idx = 0; idx < numColumns; idx++) { + if (idx > 0) { + System.out.print(", "); + } + System.out.print(row.getValue(idx)); + } + System.out.println(""); ++rowCount; } } http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/jdbc/pom.xml b/jdbc/pom.xml index 2be8c30..c99a351 100644 --- a/jdbc/pom.xml +++ b/jdbc/pom.xml @@ -70,6 +70,11 @@ <artifactId>hive-service-rpc</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-llap-common</artifactId> + <version>${project.version}</version> + </dependency> <!-- inter-project --> <dependency> <groupId>org.apache.httpcomponents</groupId> http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java new file mode 100644 index 0000000..a0ddeaa --- /dev/null +++ b/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.jdbc; + +import java.util.ArrayList; +import java.util.List; + +import java.sql.SQLException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.sql.DriverManager; + +import java.io.IOException; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.DataInputStream; +import java.io.ByteArrayInputStream; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.InputSplitWithLocationInfo; +import org.apache.hadoop.mapred.SplitLocationInfo; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.Progressable; + +import com.google.common.base.Preconditions; + +public class LlapBaseInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> { + + private static String driverName = "org.apache.hive.jdbc.HiveDriver"; + private String url; // "jdbc:hive2://localhost:10000/default" + private String user; // "hive", + private String pwd; // "" + private String query; + + public static final String URL_KEY = "llap.if.hs2.connection"; + public static final String QUERY_KEY = "llap.if.query"; + public static final String USER_KEY = "llap.if.user"; + public static final String PWD_KEY = "llap.if.pwd"; + + public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)"; + + private Connection con; + private Statement stmt; + + public LlapBaseInputFormat(String url, String user, String pwd, String query) { + this.url = url; + this.user = user; + this.pwd = pwd; + this.query = query; + } + + public LlapBaseInputFormat() {} + + + @Override + public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { + LlapInputSplit llapSplit = (LlapInputSplit) split; + return llapSplit.getInputFormat().getRecordReader(llapSplit.getSplit(), job, reporter); + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + List<InputSplit> ins = new ArrayList<InputSplit>(); + + if (url == null) url = job.get(URL_KEY); + if (query == null) query = job.get(QUERY_KEY); + if (user == null) user = job.get(USER_KEY); + if (pwd == null) pwd = job.get(PWD_KEY); + + if (url == null || query == null) { + throw new IllegalStateException(); + } + + try { + Class.forName(driverName); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + + try { + con = DriverManager.getConnection(url,user,pwd); + stmt = con.createStatement(); + String sql = String.format(SPLIT_QUERY, query, numSplits); + ResultSet res = stmt.executeQuery(sql); + while (res.next()) { + // deserialize split + DataInput in = new DataInputStream(res.getBinaryStream(3)); + InputSplitWithLocationInfo is = (InputSplitWithLocationInfo)Class.forName(res.getString(2)).newInstance(); + is.readFields(in); + ins.add(new LlapInputSplit(is, res.getString(1))); + } + + res.close(); + stmt.close(); + } catch (Exception e) { + throw new IOException(e); + } + return ins.toArray(new InputSplit[ins.size()]); + } + + public void close() { + try { + con.close(); + } catch (Exception e) { + // ignore + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java b/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java index 7ed0a0e..4c3c3ab 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java +++ b/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java @@ -49,10 +49,9 @@ import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.llap.io.api.LlapProxy; -import org.apache.hadoop.hive.llap.LlapRecordReader; -import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.llap.LlapBaseRecordReader; +import org.apache.hadoop.hive.llap.Schema; public class LlapDump { @@ -98,7 +97,7 @@ public class LlapDump { System.out.println("user: "+user); System.out.println("query: "+query); - LlapInputFormat format = new LlapInputFormat(url, user, pwd, query); + LlapBaseInputFormat format = new LlapBaseInputFormat(url, user, pwd, query); JobConf job = new JobConf(); InputSplit[] splits = format.getSplits(job, Integer.parseInt(numSplits)); @@ -113,8 +112,8 @@ public class LlapDump { LOG.info("Processing input split s from " + Arrays.toString(s.getLocations())); RecordReader<NullWritable, Text> reader = format.getRecordReader(s, job, null); - if (reader instanceof LlapRecordReader && first) { - Schema schema = ((LlapRecordReader)reader).getSchema(); + if (reader instanceof LlapBaseRecordReader && first) { + Schema schema = ((LlapBaseRecordReader)reader).getSchema(); System.out.println(""+schema); } http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java deleted file mode 100644 index 9a7c16d..0000000 --- a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hive.jdbc; - -import java.util.ArrayList; -import java.util.List; - -import java.sql.SQLException; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.Statement; -import java.sql.DriverManager; - -import java.io.IOException; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.DataInputStream; -import java.io.ByteArrayInputStream; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.InputSplitWithLocationInfo; -import org.apache.hadoop.mapred.SplitLocationInfo; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.util.Progressable; - -import com.google.common.base.Preconditions; - -public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> { - - private static String driverName = "org.apache.hive.jdbc.HiveDriver"; - private String url; // "jdbc:hive2://localhost:10000/default" - private String user; // "hive", - private String pwd; // "" - private String query; - - public final String URL_KEY = "llap.if.hs2.connection"; - public final String QUERY_KEY = "llap.if.query"; - public final String USER_KEY = "llap.if.user"; - public final String PWD_KEY = "llap.if.pwd"; - - public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)"; - - private Connection con; - private Statement stmt; - - public LlapInputFormat(String url, String user, String pwd, String query) { - this.url = url; - this.user = user; - this.pwd = pwd; - this.query = query; - } - - public LlapInputFormat() {} - - - @Override - public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { - LlapInputSplit llapSplit = (LlapInputSplit) split; - return llapSplit.getInputFormat().getRecordReader(llapSplit.getSplit(), job, reporter); - } - - @Override - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - List<InputSplit> ins = new ArrayList<InputSplit>(); - - if (url == null) url = job.get(URL_KEY); - if (query == null) query = job.get(QUERY_KEY); - if (user == null) user = job.get(USER_KEY); - if (pwd == null) pwd = job.get(PWD_KEY); - - if (url == null || query == null) { - throw new IllegalStateException(); - } - - try { - Class.forName(driverName); - } catch (ClassNotFoundException e) { - throw new IOException(e); - } - - try { - con = DriverManager.getConnection(url,user,pwd); - stmt = con.createStatement(); - String sql = String.format(SPLIT_QUERY, query, numSplits); - ResultSet res = stmt.executeQuery(sql); - while (res.next()) { - // deserialize split - DataInput in = new DataInputStream(res.getBinaryStream(3)); - InputSplitWithLocationInfo is = (InputSplitWithLocationInfo)Class.forName(res.getString(2)).newInstance(); - is.readFields(in); - ins.add(new LlapInputSplit(is, res.getString(1))); - } - - res.close(); - stmt.close(); - } catch (Exception e) { - throw new IOException(e); - } - return ins.toArray(new InputSplit[ins.size()]); - } - - public void close() { - try { - con.close(); - } catch (Exception e) { - // ignore - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java new file mode 100644 index 0000000..1cca66a --- /dev/null +++ b/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java @@ -0,0 +1,34 @@ +package org.apache.hive.jdbc; + +import java.io.IOException; + +import org.apache.hadoop.hive.llap.LlapBaseRecordReader; +import org.apache.hadoop.hive.llap.LlapRowRecordReader; +import org.apache.hadoop.hive.llap.Row; +import org.apache.hadoop.hive.llap.Schema; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +public class LlapRowInputFormat implements InputFormat<NullWritable, Row> { + LlapBaseInputFormat<Text> baseInputFormat = new LlapBaseInputFormat<Text>(); + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + return baseInputFormat.getSplits(job, numSplits); + } + + @Override + public RecordReader<NullWritable, Row> getRecordReader(InputSplit split, JobConf job, Reporter reporter) + throws IOException { + LlapInputSplit<Text> llapSplit = (LlapInputSplit<Text>) split; + LlapBaseRecordReader<Text> reader = (LlapBaseRecordReader<Text>) baseInputFormat.getRecordReader(llapSplit, job, reporter); + return new LlapRowRecordReader(job, reader.getSchema(), reader); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-common/pom.xml ---------------------------------------------------------------------- diff --git a/llap-common/pom.xml b/llap-common/pom.xml index 5343479..ceac83b 100644 --- a/llap-common/pom.xml +++ b/llap-common/pom.xml @@ -39,6 +39,11 @@ <artifactId>hive-common</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-serde</artifactId> + <version>${project.version}</version> + </dependency> <!-- inter-project --> <dependency> @@ -58,6 +63,22 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + <optional>true</optional> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.apache.tez</groupId> <artifactId>tez-api</artifactId> <version>${tez.version}</version> http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-common/src/java/org/apache/hadoop/hive/llap/FieldDesc.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/FieldDesc.java b/llap-common/src/java/org/apache/hadoop/hive/llap/FieldDesc.java new file mode 100644 index 0000000..9621978 --- /dev/null +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/FieldDesc.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.hadoop.io.Writable; + +public class FieldDesc implements Writable { + private String name; + private TypeDesc typeDesc; + + public FieldDesc() { + typeDesc = new TypeDesc(); + } + + public FieldDesc(String name, TypeDesc typeDesc) { + this.name = name; + this.typeDesc = typeDesc; + } + + public String getName() { + return name; + } + + public TypeDesc getTypeDesc() { + return typeDesc; + } + + @Override + public String toString() { + return getName() + ":" + getTypeDesc().toString(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(name); + typeDesc.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + name = in.readUTF(); + typeDesc.readFields(in); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-common/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java new file mode 100644 index 0000000..4e000ff --- /dev/null +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java @@ -0,0 +1,155 @@ +package org.apache.hadoop.hive.llap; + +import com.google.common.base.Preconditions; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.JobConf; + +import org.apache.hadoop.hive.llap.Row; +import org.apache.hadoop.hive.llap.FieldDesc; +import org.apache.hadoop.hive.llap.Schema; +import org.apache.hadoop.hive.llap.TypeDesc; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.io.HiveCharWritable; +import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class LlapRowRecordReader implements RecordReader<NullWritable, Row> { + + private static final Logger LOG = LoggerFactory.getLogger(LlapRowRecordReader.class); + + Configuration conf; + RecordReader<NullWritable, Text> reader; + Schema schema; + SerDe serde; + final Text textData = new Text(); + + public LlapRowRecordReader(Configuration conf, Schema schema, RecordReader<NullWritable, Text> reader) { + this.conf = conf; + this.schema = schema; + this.reader = reader; + } + + @Override + public void close() throws IOException { + reader.close(); + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public Row createValue() { + return new Row(schema); + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public float getProgress() throws IOException { + return 0; + } + + @Override + public boolean next(NullWritable key, Row value) throws IOException { + Preconditions.checkArgument(value != null); + + if (serde == null) { + try { + serde = initSerDe(conf); + } catch (SerDeException err) { + throw new IOException(err); + } + } + + boolean hasNext = reader.next(key, textData); + if (hasNext) { + // Deserialize Text to column values, and populate the row record + Object rowObj; + try { + StructObjectInspector rowOI = (StructObjectInspector) serde.getObjectInspector(); + rowObj = serde.deserialize(textData); + List<? extends StructField> colFields = rowOI.getAllStructFieldRefs(); + for (int idx = 0; idx < colFields.size(); ++idx) { + StructField field = colFields.get(idx); + Object colValue = rowOI.getStructFieldData(rowObj, field); + Preconditions.checkState(field.getFieldObjectInspector().getCategory() == Category.PRIMITIVE, + "Cannot handle non-primitive column type " + field.getFieldObjectInspector().getTypeName()); + + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) field.getFieldObjectInspector(); + // char/varchar special cased here since the row record handles them using Text + switch (poi.getPrimitiveCategory()) { + case CHAR: + value.setValue(idx, ((HiveCharWritable) poi.getPrimitiveWritableObject(colValue)).getPaddedValue()); + break; + case VARCHAR: + value.setValue(idx, ((HiveVarcharWritable) poi.getPrimitiveWritableObject(colValue)).getTextValue()); + break; + default: + value.setValue(idx, (Writable) poi.getPrimitiveWritableObject(colValue)); + break; + } + } + } catch (SerDeException err) { + if (LOG.isDebugEnabled()) { + LOG.debug("Error deserializing row from text: " + textData); + } + throw new IOException("Error deserializing row data", err); + } + } + + return hasNext; + } + + public Schema getSchema() { + return schema; + } + + protected SerDe initSerDe(Configuration conf) throws SerDeException { + Properties props = new Properties(); + StringBuffer columnsBuffer = new StringBuffer(); + StringBuffer typesBuffer = new StringBuffer(); + boolean isFirst = true; + for (FieldDesc colDesc : schema.getColumns()) { + if (!isFirst) { + columnsBuffer.append(','); + typesBuffer.append(','); + } + columnsBuffer.append(colDesc.getName()); + typesBuffer.append(colDesc.getTypeDesc().toString()); + isFirst = false; + } + String columns = columnsBuffer.toString(); + String types = typesBuffer.toString(); + props.put(serdeConstants.LIST_COLUMNS, columns); + props.put(serdeConstants.LIST_COLUMN_TYPES, types); + SerDe serde = new LazySimpleSerDe(); + serde.initialize(conf, props); + + return serde; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-common/src/java/org/apache/hadoop/hive/llap/Row.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/Row.java b/llap-common/src/java/org/apache/hadoop/hive/llap/Row.java new file mode 100644 index 0000000..a84fadc --- /dev/null +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/Row.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.base.Preconditions; + +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.HiveIntervalDayTimeWritable; +import org.apache.hadoop.hive.serde2.io.HiveIntervalYearMonthWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; + +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + + +public class Row { + private final Schema schema; + private final Writable[] colValues; + private final boolean[] nullIndicators; + private Map<String, Integer> nameToIndexMapping; + + public Row(Schema schema) { + this.schema = schema; + this.colValues = new Writable[schema.getColumns().size()]; + this.nullIndicators = new boolean[schema.getColumns().size()]; + this.nameToIndexMapping = new HashMap<String, Integer>(schema.getColumns().size()); + + List<FieldDesc> colDescs = schema.getColumns(); + for (int idx = 0; idx < colDescs.size(); ++idx) { + FieldDesc colDesc = colDescs.get(idx); + nameToIndexMapping.put(colDesc.getName(), idx); + colValues[idx] = createWritableForType(colDesc.getTypeDesc()); + } + } + + public Writable getValue(int colIndex) { + if (nullIndicators[colIndex]) { + return null; + } + return colValues[colIndex]; + } + + public Writable getValue(String colName) { + Integer idx = nameToIndexMapping.get(colName); + Preconditions.checkArgument(idx != null); + return getValue(idx); + } + + public Schema getSchema() { + return schema; + } + + void setValue(int colIdx, Writable value) { + Preconditions.checkArgument(colIdx <= schema.getColumns().size()); + + if (value == null) { + nullIndicators[colIdx] = true; + } else { + nullIndicators[colIdx] = false; + FieldDesc colDesc = schema.getColumns().get(colIdx); + switch (colDesc.getTypeDesc().getType()) { + case BOOLEAN: + ((BooleanWritable) colValues[colIdx]).set(((BooleanWritable) value).get()); + break; + case TINYINT: + ((ByteWritable) colValues[colIdx]).set(((ByteWritable) value).get()); + break; + case SMALLINT: + ((ShortWritable) colValues[colIdx]).set(((ShortWritable) value).get()); + break; + case INT: + ((IntWritable) colValues[colIdx]).set(((IntWritable) value).get()); + break; + case BIGINT: + ((LongWritable) colValues[colIdx]).set(((LongWritable) value).get()); + break; + case FLOAT: + ((FloatWritable) colValues[colIdx]).set(((FloatWritable) value).get()); + break; + case DOUBLE: + ((DoubleWritable) colValues[colIdx]).set(((DoubleWritable) value).get()); + break; + case STRING: + // Just handle char/varchar as Text + case CHAR: + case VARCHAR: + ((Text) colValues[colIdx]).set((Text) value); + break; + case DATE: + ((DateWritable) colValues[colIdx]).set((DateWritable) value); + break; + case TIMESTAMP: + ((TimestampWritable) colValues[colIdx]).set((TimestampWritable) value); + break; + case BINARY: + ((BytesWritable) colValues[colIdx]).set(((BytesWritable) value)); + break; + case DECIMAL: + ((HiveDecimalWritable) colValues[colIdx]).set((HiveDecimalWritable) value); + break; + } + } + } + + private Writable createWritableForType(TypeDesc typeDesc) { + switch (typeDesc.getType()) { + case BOOLEAN: + return new BooleanWritable(); + case TINYINT: + return new ByteWritable(); + case SMALLINT: + return new ShortWritable(); + case INT: + return new IntWritable(); + case BIGINT: + return new LongWritable(); + case FLOAT: + return new FloatWritable(); + case DOUBLE: + return new DoubleWritable(); + case STRING: + // Just handle char/varchar as Text + case CHAR: + case VARCHAR: + return new Text(); + case DATE: + return new DateWritable(); + case TIMESTAMP: + return new TimestampWritable(); + case BINARY: + return new BytesWritable(); + case DECIMAL: + return new HiveDecimalWritable(); + default: + throw new RuntimeException("Cannot create writable for " + typeDesc.getType()); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-common/src/java/org/apache/hadoop/hive/llap/Schema.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/Schema.java b/llap-common/src/java/org/apache/hadoop/hive/llap/Schema.java new file mode 100644 index 0000000..c1bf4ea --- /dev/null +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/Schema.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.io.Writable; + +public class Schema implements Writable { + + private final List<FieldDesc> columns; + + public Schema(List<FieldDesc> columns) { + this.columns = columns; + } + + public Schema() { + columns = new ArrayList<FieldDesc>(); + } + + public List<FieldDesc> getColumns() { + return columns; + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + boolean first = true; + for (FieldDesc colDesc : getColumns()) { + if (!first) { + sb.append(","); + } + sb.append(colDesc.toString()); + first = false; + } + return sb.toString(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(columns.size()); + for (FieldDesc column : columns) { + column.write(out); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + int numColumns = in.readInt(); + columns.clear(); + for (int idx = 0; idx < numColumns; ++idx) { + FieldDesc colDesc = new FieldDesc(); + colDesc.readFields(in); + columns.add(colDesc); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-common/src/java/org/apache/hadoop/hive/llap/TypeDesc.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/TypeDesc.java b/llap-common/src/java/org/apache/hadoop/hive/llap/TypeDesc.java new file mode 100644 index 0000000..dda5928 --- /dev/null +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/TypeDesc.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.hadoop.io.Writable; + +public class TypeDesc implements Writable { + public static enum Type { + BOOLEAN, + TINYINT, + SMALLINT, + INT, + BIGINT, + FLOAT, + DOUBLE, + STRING, + CHAR, + VARCHAR, + DATE, + TIMESTAMP, + BINARY, + DECIMAL, + } + + private TypeDesc.Type type; + private int precision; + private int scale; + + // For types with no type qualifiers + public TypeDesc(TypeDesc.Type type) { + this(type, 0, 0); + } + + // For decimal types + public TypeDesc(TypeDesc.Type type, int precision, int scale) { + this.type = type; + this.precision = precision; + this.scale = scale; + } + + // For char/varchar types + public TypeDesc(TypeDesc.Type type, int precision) { + this(type, precision, 0); + } + + // Should be used for serialization only + public TypeDesc() { + this(TypeDesc.Type.INT, 0, 0); + } + + public TypeDesc.Type getType() { + return type; + } + + public int getPrecision() { + return precision; + } + + public int getScale() { + return scale; + } + + @Override + public String toString() { + switch (type) { + case DECIMAL: + return type.name().toLowerCase() + "(" + precision + "," + scale + ")"; + case CHAR: + case VARCHAR: + return type.name().toLowerCase() + "(" + precision + ")"; + default: + return type.name().toLowerCase(); + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(type.name()); + out.writeInt(precision); + out.writeInt(scale); + } + + @Override + public void readFields(DataInput in) throws IOException { + type = TypeDesc.Type.valueOf(in.readUTF()); + precision = in.readInt(); + scale = in.readInt(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-common/src/test/org/apache/hadoop/hive/llap/TestRow.java ---------------------------------------------------------------------- diff --git a/llap-common/src/test/org/apache/hadoop/hive/llap/TestRow.java b/llap-common/src/test/org/apache/hadoop/hive/llap/TestRow.java new file mode 100644 index 0000000..d4e68f4 --- /dev/null +++ b/llap-common/src/test/org/apache/hadoop/hive/llap/TestRow.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.commons.lang.RandomStringUtils; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import static org.junit.Assert.*; + +public class TestRow { + + @Test + public void testUsage() { + Schema schema = createTestSchema(); + Row row = new Row(schema); + + Random rand = new Random(); + int iterations = 100; + Text col0 = new Text(); + IntWritable col1 = new IntWritable(); + for (int idx = 0; idx < iterations; ++idx) { + // Set the row values + boolean isNullCol0 = (rand.nextDouble() <= 0.25); + col0.set(RandomStringUtils.random(10)); + row.setValue(0, isNullCol0 ? null : col0); + + boolean isNullCol1 = (rand.nextDouble() <= 0.25); + col1.set(rand.nextInt()); + row.setValue(1, isNullCol1 ? null : col1); + + // Validate the row values + if (isNullCol0) { + assertTrue(row.getValue(0) == null); + assertTrue(row.getValue("col0") == null); + } else { + assertTrue(row.getValue(0) != null); + assertTrue(col0 != row.getValue(0)); + assertEquals(col0, row.getValue(0)); + assertEquals(col0, row.getValue("col0")); + } + + if (isNullCol1) { + assertTrue(row.getValue(1) == null); + assertTrue(row.getValue("col1") == null); + } else { + assertTrue(row.getValue(1) != null); + assertTrue(col1 != row.getValue(1)); + assertEquals(col1, row.getValue(1)); + assertEquals(col1, row.getValue("col1")); + } + } + } + + private Schema createTestSchema() { + List<FieldDesc> colDescs = new ArrayList<FieldDesc>(); + + colDescs.add(new FieldDesc("col0", + new TypeDesc(TypeDesc.Type.STRING))); + + colDescs.add(new FieldDesc("col1", + new TypeDesc(TypeDesc.Type.INT))); + + Schema schema = new Schema(colDescs); + return schema; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java index aaca7d6..0930d60 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java @@ -30,7 +30,7 @@ import com.google.protobuf.ByteString; import org.apache.commons.collections4.ListUtils; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.llap.LlapRecordReader.ReaderEvent; +import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient; @@ -74,7 +74,6 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class); - public LlapInputFormat() { } @@ -135,7 +134,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma LOG.info("Registered id: " + id); - LlapRecordReader recordReader = new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class); + LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class); umbilicalResponder.setRecordReader(recordReader); return recordReader; } @@ -276,7 +275,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma } private static class LlapRecordReaderTaskUmbilicalExternalResponder implements LlapTaskUmbilicalExternalResponder { - protected LlapRecordReader recordReader = null; + protected LlapBaseRecordReader recordReader = null; protected LinkedBlockingQueue<ReaderEvent> queuedEvents = new LinkedBlockingQueue<ReaderEvent>(); public LlapRecordReaderTaskUmbilicalExternalResponder() { @@ -285,7 +284,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma @Override public void submissionFailed(String fragmentId, Throwable throwable) { try { - sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent( + sendOrQueueEvent(ReaderEvent.errorEvent( "Received submission failed event for fragment ID " + fragmentId)); } catch (Exception err) { LOG.error("Error during heartbeat responder:", err); @@ -301,11 +300,11 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma try { switch (eventType) { case TASK_ATTEMPT_COMPLETED_EVENT: - sendOrQueueEvent(LlapRecordReader.ReaderEvent.doneEvent()); + sendOrQueueEvent(ReaderEvent.doneEvent()); break; case TASK_ATTEMPT_FAILED_EVENT: TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) tezEvent.getEvent(); - sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent(taskFailedEvent.getDiagnostics())); + sendOrQueueEvent(ReaderEvent.errorEvent(taskFailedEvent.getDiagnostics())); break; case TASK_STATUS_UPDATE_EVENT: // If we want to handle counters @@ -323,7 +322,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma @Override public void taskKilled(TezTaskAttemptID taskAttemptId) { try { - sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent( + sendOrQueueEvent(ReaderEvent.errorEvent( "Received task killed event for task ID " + taskAttemptId)); } catch (Exception err) { LOG.error("Error during heartbeat responder:", err); @@ -333,18 +332,18 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma @Override public void heartbeatTimeout(String taskAttemptId) { try { - sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent( + sendOrQueueEvent(ReaderEvent.errorEvent( "Timed out waiting for heartbeat for task ID " + taskAttemptId)); } catch (Exception err) { LOG.error("Error during heartbeat responder:", err); } } - public synchronized LlapRecordReader getRecordReader() { + public synchronized LlapBaseRecordReader getRecordReader() { return recordReader; } - public synchronized void setRecordReader(LlapRecordReader recordReader) { + public synchronized void setRecordReader(LlapBaseRecordReader recordReader) { this.recordReader = recordReader; if (recordReader == null) { @@ -353,7 +352,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma // If any events were queued by the responder, give them to the record reader now. while (!queuedEvents.isEmpty()) { - LlapRecordReader.ReaderEvent readerEvent = queuedEvents.poll(); + ReaderEvent readerEvent = queuedEvents.poll(); LOG.debug("Sending queued event to record reader: " + readerEvent.getEventType()); recordReader.handleEvent(readerEvent); } @@ -365,8 +364,8 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma * since we don't want to drop these events. * @param readerEvent */ - protected synchronized void sendOrQueueEvent(LlapRecordReader.ReaderEvent readerEvent) { - LlapRecordReader recordReader = getRecordReader(); + protected synchronized void sendOrQueueEvent(ReaderEvent readerEvent) { + LlapBaseRecordReader recordReader = getRecordReader(); if (recordReader != null) { recordReader.handleEvent(readerEvent); } else { http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/ql/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java new file mode 100644 index 0000000..7073280 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.DataInputStream; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.Schema; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.JobConf; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LlapBaseRecordReader<V extends WritableComparable> implements RecordReader<NullWritable, V> { + private static final Logger LOG = LoggerFactory.getLogger(LlapBaseRecordReader.class); + + DataInputStream din; + Schema schema; + Class<V> clazz; + + + protected Thread readerThread = null; + protected LinkedBlockingQueue<ReaderEvent> readerEvents = new LinkedBlockingQueue<ReaderEvent>(); + + public LlapBaseRecordReader(InputStream in, Schema schema, Class<V> clazz) { + din = new DataInputStream(in); + this.schema = schema; + this.clazz = clazz; + this.readerThread = Thread.currentThread(); + } + + public Schema getSchema() { + return schema; + } + + @Override + public void close() throws IOException { + din.close(); + } + + @Override + public long getPos() { return 0; } + + @Override + public float getProgress() { return 0f; } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public V createValue() { + try { + return clazz.newInstance(); + } catch (Exception e) { + return null; + } + } + + @Override + public boolean next(NullWritable key, V value) throws IOException { + try { + // Need a way to know what thread to interrupt, since this is a blocking thread. + setReaderThread(Thread.currentThread()); + + value.readFields(din); + return true; + } catch (EOFException eof) { + // End of input. There should be a reader event available, or coming soon, so okay to be blocking call. + ReaderEvent event = getReaderEvent(); + switch (event.getEventType()) { + case DONE: + break; + default: + throw new IOException("Expected reader event with done status, but got " + + event.getEventType() + " with message " + event.getMessage()); + } + return false; + } catch (IOException io) { + if (Thread.interrupted()) { + // Either we were interrupted by one of: + // 1. handleEvent(), in which case there is a reader event waiting for us in the queue + // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming. + // Either way we should not try to block trying to read the reader events queue. + if (readerEvents.isEmpty()) { + // Case 2. + throw io; + } else { + // Case 1. Fail the reader, sending back the error we received from the reader event. + ReaderEvent event = getReaderEvent(); + switch (event.getEventType()) { + case ERROR: + throw new IOException("Received reader event error: " + event.getMessage()); + default: + throw new IOException("Got reader event type " + event.getEventType() + ", expected error event"); + } + } + } else { + // If we weren't interrupted, just propagate the error + throw io; + } + } + } + + /** + * Define success/error events which are passed to the reader from a different thread. + * The reader will check for these events on end of input and interruption of the reader thread. + */ + public static class ReaderEvent { + public enum EventType { + DONE, + ERROR + } + + protected final EventType eventType; + protected final String message; + + protected ReaderEvent(EventType type, String message) { + this.eventType = type; + this.message = message; + } + + public static ReaderEvent doneEvent() { + return new ReaderEvent(EventType.DONE, ""); + } + + public static ReaderEvent errorEvent(String message) { + return new ReaderEvent(EventType.ERROR, message); + } + + public EventType getEventType() { + return eventType; + } + + public String getMessage() { + return message; + } + } + + public void handleEvent(ReaderEvent event) { + switch (event.getEventType()) { + case DONE: + // Reader will check for the event queue upon the end of the input stream - no need to interrupt. + readerEvents.add(event); + break; + case ERROR: + readerEvents.add(event); + if (readerThread == null) { + throw new RuntimeException("Reader thread is unexpectedly null, during ReaderEvent error " + event.getMessage()); + } + // Reader is using a blocking socket .. interrupt it. + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupting reader thread due to reader event with error " + event.getMessage()); + } + getReaderThread().interrupt(); + break; + default: + throw new RuntimeException("Unhandled ReaderEvent type " + event.getEventType() + " with message " + event.getMessage()); + } + } + + protected ReaderEvent getReaderEvent() { + try { + ReaderEvent event = readerEvents.take(); + return event; + } catch (InterruptedException ie) { + throw new RuntimeException("Interrupted while getting readerEvents, not expected", ie); + } + } + + protected synchronized void setReaderThread(Thread readerThread) { + this.readerThread = readerThread; + } + + protected synchronized Thread getReaderThread() { + return readerThread; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java index 17a0d2d..02aedfd 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java @@ -20,7 +20,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.llap.Schema; import org.apache.hadoop.mapred.InputSplitWithLocationInfo; import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.thrift.TDeserializer; @@ -93,17 +93,7 @@ public class LlapInputSplit implements InputSplitWithLocationInfo { out.writeUTF(locations[i].getLocation()); } - byte[] binarySchema; - - try { - TSerializer serializer = new TSerializer(); - byte[] serialzied = serializer.serialize(schema); - out.writeInt(serialzied.length); - out.write(serialzied); - } catch (Exception e) { - throw new IOException(e); - } - + schema.write(out); out.writeUTF(llapUser); } @@ -125,17 +115,8 @@ public class LlapInputSplit implements InputSplitWithLocationInfo { locations[i] = new SplitLocationInfo(in.readUTF(), false); } - length = in.readInt(); - - try { - byte[] schemaBytes = new byte[length]; - in.readFully(schemaBytes); - TDeserializer tDeserializer = new TDeserializer(); - schema = new Schema(); - tDeserializer.deserialize(schema, schemaBytes); - } catch (Exception e) { - throw new IOException(e); - } + schema = new Schema(); + schema.readFields(in); llapUser = in.readUTF(); } http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java deleted file mode 100644 index 64e5e69..0000000 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.DataInputStream; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.io.RCFile.Reader; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.hive.metastore.api.Schema; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class LlapRecordReader<V extends WritableComparable> implements RecordReader<NullWritable, V> { - private static final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class); - - DataInputStream din; - Schema schema; - Class<V> clazz; - - - protected Thread readerThread = null; - protected LinkedBlockingQueue<ReaderEvent> readerEvents = new LinkedBlockingQueue<ReaderEvent>(); - - public LlapRecordReader(InputStream in, Schema schema, Class<V> clazz) { - din = new DataInputStream(in); - this.schema = schema; - this.clazz = clazz; - this.readerThread = Thread.currentThread(); - } - - public Schema getSchema() { - return schema; - } - - @Override - public void close() throws IOException { - din.close(); - } - - @Override - public long getPos() { return 0; } - - @Override - public float getProgress() { return 0f; } - - @Override - public NullWritable createKey() { - return NullWritable.get(); - } - - @Override - public V createValue() { - try { - return clazz.newInstance(); - } catch (Exception e) { - return null; - } - } - - @Override - public boolean next(NullWritable key, V value) throws IOException { - try { - // Need a way to know what thread to interrupt, since this is a blocking thread. - setReaderThread(Thread.currentThread()); - - value.readFields(din); - return true; - } catch (EOFException eof) { - // End of input. There should be a reader event available, or coming soon, so okay to be blocking call. - ReaderEvent event = getReaderEvent(); - switch (event.getEventType()) { - case DONE: - break; - default: - throw new IOException("Expected reader event with done status, but got " - + event.getEventType() + " with message " + event.getMessage()); - } - return false; - } catch (IOException io) { - if (Thread.interrupted()) { - // Either we were interrupted by one of: - // 1. handleEvent(), in which case there is a reader event waiting for us in the queue - // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming. - // Either way we should not try to block trying to read the reader events queue. - if (readerEvents.isEmpty()) { - // Case 2. - throw io; - } else { - // Case 1. Fail the reader, sending back the error we received from the reader event. - ReaderEvent event = getReaderEvent(); - switch (event.getEventType()) { - case ERROR: - throw new IOException("Received reader event error: " + event.getMessage()); - default: - throw new IOException("Got reader event type " + event.getEventType() + ", expected error event"); - } - } - } else { - // If we weren't interrupted, just propagate the error - throw io; - } - } - } - - /** - * Define success/error events which are passed to the reader from a different thread. - * The reader will check for these events on end of input and interruption of the reader thread. - */ - public static class ReaderEvent { - public enum EventType { - DONE, - ERROR - } - - protected final EventType eventType; - protected final String message; - - protected ReaderEvent(EventType type, String message) { - this.eventType = type; - this.message = message; - } - - public static ReaderEvent doneEvent() { - return new ReaderEvent(EventType.DONE, ""); - } - - public static ReaderEvent errorEvent(String message) { - return new ReaderEvent(EventType.ERROR, message); - } - - public EventType getEventType() { - return eventType; - } - - public String getMessage() { - return message; - } - } - - public void handleEvent(ReaderEvent event) { - switch (event.getEventType()) { - case DONE: - // Reader will check for the event queue upon the end of the input stream - no need to interrupt. - readerEvents.add(event); - break; - case ERROR: - readerEvents.add(event); - if (readerThread == null) { - throw new RuntimeException("Reader thread is unexpectedly null, during ReaderEvent error " + event.getMessage()); - } - // Reader is using a blocking socket .. interrupt it. - if (LOG.isDebugEnabled()) { - LOG.debug("Interrupting reader thread due to reader event with error " + event.getMessage()); - } - getReaderThread().interrupt(); - break; - default: - throw new RuntimeException("Unhandled ReaderEvent type " + event.getEventType() + " with message " + event.getMessage()); - } - } - - protected ReaderEvent getReaderEvent() { - try { - ReaderEvent event = readerEvents.take(); - return event; - } catch (InterruptedException ie) { - throw new RuntimeException("Interrupted while getting readerEvents, not expected", ie); - } - } - - protected synchronized void setReaderThread(Thread readerThread) { - this.readerThread = readerThread; - } - - protected synchronized Thread getReaderThread() { - return readerThread; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index 6267324..51027a7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -45,7 +45,10 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.LlapInputSplit; import org.apache.hadoop.hive.llap.LlapOutputFormat; import org.apache.hadoop.hive.llap.SubmitWorkInfo; -import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.llap.Schema; +import org.apache.hadoop.hive.llap.FieldDesc; +import org.apache.hadoop.hive.llap.TypeDesc; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.Driver; @@ -71,6 +74,12 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -220,7 +229,7 @@ public class GenericUDTFGetSplits extends GenericUDTF { QueryPlan plan = driver.getPlan(); List<Task<?>> roots = plan.getRootTasks(); - Schema schema = plan.getResultSchema(); + Schema schema = convertSchema(plan.getResultSchema()); if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) { throw new HiveException("Was expecting a single TezTask."); @@ -255,7 +264,7 @@ public class GenericUDTFGetSplits extends GenericUDTF { plan = driver.getPlan(); roots = plan.getRootTasks(); - schema = plan.getResultSchema(); + schema = convertSchema(plan.getResultSchema()); if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) { throw new HiveException("Was expecting a single TezTask."); @@ -416,6 +425,78 @@ public class GenericUDTFGetSplits extends GenericUDTF { } } + private TypeDesc convertTypeString(String typeString) throws HiveException { + TypeDesc typeDesc; + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeString); + Preconditions.checkState(typeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE, + "Unsupported non-primitive type " + typeString); + + switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) { + case BOOLEAN: + typeDesc = new TypeDesc(TypeDesc.Type.BOOLEAN); + break; + case BYTE: + typeDesc = new TypeDesc(TypeDesc.Type.TINYINT); + break; + case SHORT: + typeDesc = new TypeDesc(TypeDesc.Type.SMALLINT); + break; + case INT: + typeDesc = new TypeDesc(TypeDesc.Type.INT); + break; + case LONG: + typeDesc = new TypeDesc(TypeDesc.Type.BIGINT); + break; + case FLOAT: + typeDesc = new TypeDesc(TypeDesc.Type.FLOAT); + break; + case DOUBLE: + typeDesc = new TypeDesc(TypeDesc.Type.DOUBLE); + break; + case STRING: + typeDesc = new TypeDesc(TypeDesc.Type.STRING); + break; + case CHAR: + CharTypeInfo charTypeInfo = (CharTypeInfo) typeInfo; + typeDesc = new TypeDesc(TypeDesc.Type.CHAR, charTypeInfo.getLength()); + break; + case VARCHAR: + VarcharTypeInfo varcharTypeInfo = (VarcharTypeInfo) typeInfo; + typeDesc = new TypeDesc(TypeDesc.Type.CHAR, varcharTypeInfo.getLength()); + break; + case DATE: + typeDesc = new TypeDesc(TypeDesc.Type.DATE); + break; + case TIMESTAMP: + typeDesc = new TypeDesc(TypeDesc.Type.TIMESTAMP); + break; + case BINARY: + typeDesc = new TypeDesc(TypeDesc.Type.BINARY); + break; + case DECIMAL: + DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo; + typeDesc = new TypeDesc(TypeDesc.Type.DECIMAL, decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale()); + break; + default: + throw new HiveException("Unsupported type " + typeString); + } + + return typeDesc; + } + + private Schema convertSchema(Object obj) throws HiveException { + org.apache.hadoop.hive.metastore.api.Schema schema = (org.apache.hadoop.hive.metastore.api.Schema) obj; + List<FieldDesc> colDescs = new ArrayList<FieldDesc>(); + for (FieldSchema fs : schema.getFieldSchemas()) { + String colName = fs.getName(); + String typeString = fs.getType(); + TypeDesc typeDesc = convertTypeString(typeString); + colDescs.add(new FieldDesc(colName, typeDesc)); + } + Schema Schema = new Schema(colDescs); + return Schema; + } + @Override public void close() throws HiveException { } http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java index 7b516fe..37e21b8 100644 --- a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java @@ -52,6 +52,7 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent; import org.apache.hadoop.hive.llap.io.api.LlapProxy; @@ -112,10 +113,13 @@ public class TestLlapOutputFormat { writer.close(null); InputStream in = socket.getInputStream(); - RecordReader reader = new LlapRecordReader(in, null, Text.class); + LlapBaseRecordReader reader = new LlapBaseRecordReader(in, null, Text.class); LOG.debug("Have record reader"); + // Send done event, which LlapRecordReader is expecting upon end of input + reader.handleEvent(ReaderEvent.doneEvent()); + int count = 0; while(reader.next(NullWritable.get(), text)) { LOG.debug(text.toString());