Repository: hive Updated Branches: refs/heads/llap fc7343dd1 -> 7b9096a92
http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-common/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java deleted file mode 100644 index 4e000ff..0000000 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java +++ /dev/null @@ -1,155 +0,0 @@ -package org.apache.hadoop.hive.llap; - -import com.google.common.base.Preconditions; - -import java.io.IOException; -import java.util.List; -import java.util.Properties; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.JobConf; - -import org.apache.hadoop.hive.llap.Row; -import org.apache.hadoop.hive.llap.FieldDesc; -import org.apache.hadoop.hive.llap.Schema; -import org.apache.hadoop.hive.llap.TypeDesc; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.SerDe; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.io.HiveCharWritable; -import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; -import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class LlapRowRecordReader implements RecordReader<NullWritable, Row> { - - private static final Logger LOG = LoggerFactory.getLogger(LlapRowRecordReader.class); - - Configuration conf; - RecordReader<NullWritable, Text> reader; - Schema schema; - SerDe serde; - final Text textData = new Text(); - - public LlapRowRecordReader(Configuration conf, Schema schema, RecordReader<NullWritable, Text> reader) { - this.conf = conf; - this.schema = schema; - this.reader = reader; - } - - @Override - public void close() throws IOException { - reader.close(); - } - - @Override - public NullWritable createKey() { - return NullWritable.get(); - } - - @Override - public Row createValue() { - return new Row(schema); - } - - @Override - public long getPos() throws IOException { - return 0; - } - - @Override - public float getProgress() throws IOException { - return 0; - } - - @Override - public boolean next(NullWritable key, Row value) throws IOException { - Preconditions.checkArgument(value != null); - - if (serde == null) { - try { - serde = initSerDe(conf); - } catch (SerDeException err) { - throw new IOException(err); - } - } - - boolean hasNext = reader.next(key, textData); - if (hasNext) { - // Deserialize Text to column values, and populate the row record - Object rowObj; - try { - StructObjectInspector rowOI = (StructObjectInspector) serde.getObjectInspector(); - rowObj = serde.deserialize(textData); - List<? extends StructField> colFields = rowOI.getAllStructFieldRefs(); - for (int idx = 0; idx < colFields.size(); ++idx) { - StructField field = colFields.get(idx); - Object colValue = rowOI.getStructFieldData(rowObj, field); - Preconditions.checkState(field.getFieldObjectInspector().getCategory() == Category.PRIMITIVE, - "Cannot handle non-primitive column type " + field.getFieldObjectInspector().getTypeName()); - - PrimitiveObjectInspector poi = (PrimitiveObjectInspector) field.getFieldObjectInspector(); - // char/varchar special cased here since the row record handles them using Text - switch (poi.getPrimitiveCategory()) { - case CHAR: - value.setValue(idx, ((HiveCharWritable) poi.getPrimitiveWritableObject(colValue)).getPaddedValue()); - break; - case VARCHAR: - value.setValue(idx, ((HiveVarcharWritable) poi.getPrimitiveWritableObject(colValue)).getTextValue()); - break; - default: - value.setValue(idx, (Writable) poi.getPrimitiveWritableObject(colValue)); - break; - } - } - } catch (SerDeException err) { - if (LOG.isDebugEnabled()) { - LOG.debug("Error deserializing row from text: " + textData); - } - throw new IOException("Error deserializing row data", err); - } - } - - return hasNext; - } - - public Schema getSchema() { - return schema; - } - - protected SerDe initSerDe(Configuration conf) throws SerDeException { - Properties props = new Properties(); - StringBuffer columnsBuffer = new StringBuffer(); - StringBuffer typesBuffer = new StringBuffer(); - boolean isFirst = true; - for (FieldDesc colDesc : schema.getColumns()) { - if (!isFirst) { - columnsBuffer.append(','); - typesBuffer.append(','); - } - columnsBuffer.append(colDesc.getName()); - typesBuffer.append(colDesc.getTypeDesc().toString()); - isFirst = false; - } - String columns = columnsBuffer.toString(); - String types = typesBuffer.toString(); - props.put(serdeConstants.LIST_COLUMNS, columns); - props.put(serdeConstants.LIST_COLUMN_TYPES, types); - SerDe serde = new LazySimpleSerDe(); - serde.initialize(conf, props); - - return serde; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-ext-client/pom.xml ---------------------------------------------------------------------- diff --git a/llap-ext-client/pom.xml b/llap-ext-client/pom.xml new file mode 100644 index 0000000..5a7e385 --- /dev/null +++ b/llap-ext-client/pom.xml @@ -0,0 +1,140 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.hive</groupId> + <artifactId>hive</artifactId> + <version>2.1.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>hive-llap-ext-client</artifactId> + <packaging>jar</packaging> + <name>Hive Llap External Client</name> + + <properties> + <hive.path.to.root>..</hive.path.to.root> + </properties> + + <dependencies> + <!-- dependencies are always listed in sorted order by groupId, artifactId --> + <!-- intra-project --> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-jdbc</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-llap-client</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- inter-project --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <optional>true</optional> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + <optional>true</optional> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-registry</artifactId> + <version>${hadoop.version}</version> + <optional>true</optional> + </dependency> + <!-- test inter-project --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>${commons-lang3.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>${mockito-all.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + <build> + <sourceDirectory>${basedir}/src/java</sourceDirectory> + <testSourceDirectory>${basedir}/src/test</testSourceDirectory> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/gen/protobuf/gen-java</source> + <source>src/gen/thrift/gen-javabean</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java new file mode 100644 index 0000000..61eb2ea --- /dev/null +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap; + +import java.util.ArrayList; +import java.util.List; + +import java.sql.SQLException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.sql.DriverManager; + +import java.io.IOException; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.DataInputStream; +import java.io.ByteArrayInputStream; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.InputSplitWithLocationInfo; +import org.apache.hadoop.mapred.SplitLocationInfo; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.Progressable; +import org.apache.hive.llap.ext.LlapInputSplit; + +import com.google.common.base.Preconditions; + +public class LlapBaseInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> { + + private static String driverName = "org.apache.hive.jdbc.HiveDriver"; + private String url; // "jdbc:hive2://localhost:10000/default" + private String user; // "hive", + private String pwd; // "" + private String query; + + public static final String URL_KEY = "llap.if.hs2.connection"; + public static final String QUERY_KEY = "llap.if.query"; + public static final String USER_KEY = "llap.if.user"; + public static final String PWD_KEY = "llap.if.pwd"; + + public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)"; + + private Connection con; + private Statement stmt; + + public LlapBaseInputFormat(String url, String user, String pwd, String query) { + this.url = url; + this.user = user; + this.pwd = pwd; + this.query = query; + } + + public LlapBaseInputFormat() {} + + + @Override + public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { + LlapInputSplit llapSplit = (LlapInputSplit) split; + return llapSplit.getInputFormat().getRecordReader(llapSplit.getSplit(), job, reporter); + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + List<InputSplit> ins = new ArrayList<InputSplit>(); + + if (url == null) url = job.get(URL_KEY); + if (query == null) query = job.get(QUERY_KEY); + if (user == null) user = job.get(USER_KEY); + if (pwd == null) pwd = job.get(PWD_KEY); + + if (url == null || query == null) { + throw new IllegalStateException(); + } + + try { + Class.forName(driverName); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + + try { + con = DriverManager.getConnection(url,user,pwd); + stmt = con.createStatement(); + String sql = String.format(SPLIT_QUERY, query, numSplits); + ResultSet res = stmt.executeQuery(sql); + while (res.next()) { + // deserialize split + DataInput in = new DataInputStream(res.getBinaryStream(3)); + InputSplitWithLocationInfo is = (InputSplitWithLocationInfo)Class.forName(res.getString(2)).newInstance(); + is.readFields(in); + ins.add(new LlapInputSplit(is, res.getString(1))); + } + + res.close(); + stmt.close(); + } catch (Exception e) { + throw new IOException(e); + } + return ins.toArray(new InputSplit[ins.size()]); + } + + public void close() { + try { + con.close(); + } catch (Exception e) { + // ignore + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java ---------------------------------------------------------------------- diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java new file mode 100644 index 0000000..ce419af --- /dev/null +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap; + +import java.io.OutputStream; +import java.io.InputStream; +import java.io.File; +import java.io.IOException; +import java.io.FileInputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.RCFile.Reader; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.hive.llap.io.api.LlapProxy; +import org.apache.hadoop.hive.llap.LlapBaseRecordReader; +import org.apache.hadoop.hive.llap.Schema; + +import org.apache.hadoop.hive.llap.LlapBaseInputFormat; + +public class LlapDump { + + private static final Logger LOG = LoggerFactory.getLogger(LlapDump.class); + + private static String url = "jdbc:hive2://localhost:10000/default"; + private static String user = "hive"; + private static String pwd = ""; + private static String query = "select * from test"; + private static String numSplits = "1"; + + public static void main(String[] args) throws Exception { + Options opts = createOptions(); + CommandLine cli = new GnuParser().parse(opts, args); + + if (cli.hasOption('h')) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("orcfiledump", opts); + return; + } + + if (cli.hasOption('l')) { + url = cli.getOptionValue("l"); + } + + if (cli.hasOption('u')) { + user = cli.getOptionValue("u"); + } + + if (cli.hasOption('p')) { + pwd = cli.getOptionValue("p"); + } + + if (cli.hasOption('n')) { + numSplits = cli.getOptionValue("n"); + } + + if (cli.getArgs().length > 0) { + query = cli.getArgs()[0]; + } + + System.out.println("url: "+url); + System.out.println("user: "+user); + System.out.println("query: "+query); + + LlapBaseInputFormat format = new LlapBaseInputFormat(url, user, pwd, query); + JobConf job = new JobConf(); + + InputSplit[] splits = format.getSplits(job, Integer.parseInt(numSplits)); + + if (splits.length == 0) { + System.out.println("No splits returned - empty scan"); + System.out.println("Results: "); + } else { + boolean first = true; + + for (InputSplit s: splits) { + LOG.info("Processing input split s from " + Arrays.toString(s.getLocations())); + RecordReader<NullWritable, Text> reader = format.getRecordReader(s, job, null); + + if (reader instanceof LlapBaseRecordReader && first) { + Schema schema = ((LlapBaseRecordReader)reader).getSchema(); + System.out.println(""+schema); + } + + if (first) { + System.out.println("Results: "); + System.out.println(""); + first = false; + } + + Text value = reader.createValue(); + while (reader.next(NullWritable.get(), value)) { + System.out.println(value); + } + } + System.exit(0); + } + } + + static Options createOptions() { + Options result = new Options(); + + result.addOption(OptionBuilder + .withLongOpt("location") + .withDescription("HS2 url") + .hasArg() + .create('l')); + + result.addOption(OptionBuilder + .withLongOpt("user") + .withDescription("user name") + .hasArg() + .create('u')); + + result.addOption(OptionBuilder + .withLongOpt("pwd") + .withDescription("password") + .hasArg() + .create('p')); + + result.addOption(OptionBuilder + .withLongOpt("num") + .withDescription("number of splits") + .hasArg() + .create('n')); + + return result; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java new file mode 100644 index 0000000..6ecb0f9 --- /dev/null +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java @@ -0,0 +1,36 @@ +package org.apache.hadoop.hive.llap; + +import java.io.IOException; + +import org.apache.hadoop.hive.llap.LlapBaseRecordReader; +import org.apache.hadoop.hive.llap.LlapRowRecordReader; +import org.apache.hadoop.hive.llap.Row; +import org.apache.hadoop.hive.llap.Schema; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hive.llap.ext.LlapInputSplit; + + +public class LlapRowInputFormat implements InputFormat<NullWritable, Row> { + LlapBaseInputFormat<Text> baseInputFormat = new LlapBaseInputFormat<Text>(); + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + return baseInputFormat.getSplits(job, numSplits); + } + + @Override + public RecordReader<NullWritable, Row> getRecordReader(InputSplit split, JobConf job, Reporter reporter) + throws IOException { + LlapInputSplit<Text> llapSplit = (LlapInputSplit<Text>) split; + LlapBaseRecordReader<Text> reader = (LlapBaseRecordReader<Text>) baseInputFormat.getRecordReader(llapSplit, job, reporter); + return new LlapRowRecordReader(job, reader.getSchema(), reader); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-ext-client/src/java/org/apache/hive/llap/ext/LlapInputSplit.java ---------------------------------------------------------------------- diff --git a/llap-ext-client/src/java/org/apache/hive/llap/ext/LlapInputSplit.java b/llap-ext-client/src/java/org/apache/hive/llap/ext/LlapInputSplit.java new file mode 100644 index 0000000..d8881c4 --- /dev/null +++ b/llap-ext-client/src/java/org/apache/hive/llap/ext/LlapInputSplit.java @@ -0,0 +1,73 @@ +package org.apache.hive.llap.ext; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.InputSplitWithLocationInfo; +import org.apache.hadoop.mapred.SplitLocationInfo; + + +public class LlapInputSplit<V extends WritableComparable> implements InputSplitWithLocationInfo { + InputSplitWithLocationInfo nativeSplit; + String inputFormatClassName; + + public LlapInputSplit() { + } + + public LlapInputSplit(InputSplitWithLocationInfo nativeSplit, String inputFormatClassName) { + this.nativeSplit = nativeSplit; + this.inputFormatClassName = inputFormatClassName; + } + + @Override + public long getLength() throws IOException { + return nativeSplit.getLength(); + } + + @Override + public String[] getLocations() throws IOException { + return nativeSplit.getLocations(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(inputFormatClassName); + out.writeUTF(nativeSplit.getClass().getName()); + nativeSplit.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + inputFormatClassName = in.readUTF(); + String splitClass = in.readUTF(); + try { + nativeSplit = (InputSplitWithLocationInfo)Class.forName(splitClass).newInstance(); + } catch (Exception e) { + throw new IOException(e); + } + nativeSplit.readFields(in); + } + + @Override + public SplitLocationInfo[] getLocationInfo() throws IOException { + return nativeSplit.getLocationInfo(); + } + + public InputSplit getSplit() { + return nativeSplit; + } + + public InputFormat<NullWritable, V> getInputFormat() throws IOException { + try { + return (InputFormat<NullWritable, V>) Class.forName(inputFormatClassName) + .newInstance(); + } catch(Exception e) { + throw new IOException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java deleted file mode 100644 index 0930d60..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java +++ /dev/null @@ -1,392 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.llap; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; - -import com.google.common.collect.Lists; -import com.google.protobuf.ByteString; - -import org.apache.commons.collections4.ListUtils; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; -import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient; -import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder; -import org.apache.hadoop.hive.llap.registry.ServiceInstance; -import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; -import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; -import org.apache.hadoop.hive.llap.tez.Converters; -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.tez.common.security.JobTokenIdentifier; -import org.apache.tez.common.security.TokenCache; -import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; -import org.apache.tez.runtime.api.impl.TaskSpec; -import org.apache.tez.runtime.api.impl.TezEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.runtime.api.impl.EventType; -import org.apache.tez.runtime.api.impl.TezEvent; -import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; -import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; - - -public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> { - - private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class); - - public LlapInputFormat() { - } - - /* - * This proxy record reader has the duty of establishing a connected socket with LLAP, then fire - * off the work in the split to LLAP and finally return the connected socket back in an - * LlapRecordReader. The LlapRecordReader class reads the results from the socket. - */ - @Override - public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, - Reporter reporter) throws IOException { - - LlapInputSplit llapSplit = (LlapInputSplit) split; - - // Set conf to use LLAP user rather than current user for LLAP Zk registry. - HiveConf.setVar(job, HiveConf.ConfVars.LLAP_ZK_REGISTRY_USER, llapSplit.getLlapUser()); - SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes()); - - ServiceInstance serviceInstance = getServiceInstance(job, llapSplit); - String host = serviceInstance.getHost(); - int llapSubmitPort = serviceInstance.getRpcPort(); - - LOG.info("Found service instance for host " + host + " with rpc port " + llapSubmitPort - + " and outputformat port " + serviceInstance.getOutputFormatPort()); - - LlapRecordReaderTaskUmbilicalExternalResponder umbilicalResponder = - new LlapRecordReaderTaskUmbilicalExternalResponder(); - LlapTaskUmbilicalExternalClient llapClient = - new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(), - submitWorkInfo.getToken(), umbilicalResponder); - llapClient.init(job); - llapClient.start(); - - SubmitWorkRequestProto submitWorkRequestProto = - constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(), - llapClient.getAddress(), submitWorkInfo.getToken()); - - TezEvent tezEvent = new TezEvent(); - DataInputBuffer dib = new DataInputBuffer(); - dib.reset(llapSplit.getFragmentBytes(), 0, llapSplit.getFragmentBytes().length); - tezEvent.readFields(dib); - List<TezEvent> tezEventList = Lists.newArrayList(); - tezEventList.add(tezEvent); - - llapClient.submitWork(submitWorkRequestProto, host, llapSubmitPort, tezEventList); - - String id = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID) + "_" + llapSplit.getSplitNum(); - - HiveConf conf = new HiveConf(); - Socket socket = new Socket(host, - serviceInstance.getOutputFormatPort()); - - LOG.debug("Socket connected"); - - socket.getOutputStream().write(id.getBytes()); - socket.getOutputStream().write(0); - socket.getOutputStream().flush(); - - LOG.info("Registered id: " + id); - - LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class); - umbilicalResponder.setRecordReader(recordReader); - return recordReader; - } - - @Override - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - throw new IOException("These are not the splits you are looking for."); - } - - private ServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit) throws IOException { - LlapRegistryService registryService = LlapRegistryService.getClient(job); - String host = llapSplit.getLocations()[0]; - - ServiceInstance serviceInstance = getServiceInstanceForHost(registryService, host); - if (serviceInstance == null) { - throw new IOException("No service instances found for " + host + " in registry"); - } - - return serviceInstance; - } - - private ServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException { - InetAddress address = InetAddress.getByName(host); - ServiceInstanceSet instanceSet = registryService.getInstances(); - ServiceInstance serviceInstance = null; - - // The name used in the service registry may not match the host name we're using. - // Try hostname/canonical hostname/host address - - String name = address.getHostName(); - LOG.info("Searching service instance by hostname " + name); - serviceInstance = selectServiceInstance(instanceSet.getByHost(name)); - if (serviceInstance != null) { - return serviceInstance; - } - - name = address.getCanonicalHostName(); - LOG.info("Searching service instance by canonical hostname " + name); - serviceInstance = selectServiceInstance(instanceSet.getByHost(name)); - if (serviceInstance != null) { - return serviceInstance; - } - - name = address.getHostAddress(); - LOG.info("Searching service instance by address " + name); - serviceInstance = selectServiceInstance(instanceSet.getByHost(name)); - if (serviceInstance != null) { - return serviceInstance; - } - - return serviceInstance; - } - - private ServiceInstance selectServiceInstance(Set<ServiceInstance> serviceInstances) { - if (serviceInstances == null || serviceInstances.isEmpty()) { - return null; - } - - // Get the first live service instance - for (ServiceInstance serviceInstance : serviceInstances) { - if (serviceInstance.isAlive()) { - return serviceInstance; - } - } - - LOG.info("No live service instances were found"); - return null; - } - - private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo, - int taskNum, - InetSocketAddress address, - Token<JobTokenIdentifier> token) throws - IOException { - TaskSpec taskSpec = submitWorkInfo.getTaskSpec(); - ApplicationId appId = submitWorkInfo.getFakeAppId(); - - SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder(); - // This works, assuming the executor is running within YARN. - LOG.info("Setting user in submitWorkRequest to: " + - System.getenv(ApplicationConstants.Environment.USER.name())); - builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name())); - builder.setApplicationIdString(appId.toString()); - builder.setAppAttemptNumber(0); - builder.setTokenIdentifier(appId.toString()); - - ContainerId containerId = - ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum); - builder.setContainerIdString(containerId.toString()); - - builder.setAmHost(address.getHostName()); - builder.setAmPort(address.getPort()); - Credentials taskCredentials = new Credentials(); - // Credentials can change across DAGs. Ideally construct only once per DAG. - // TODO Figure out where credentials will come from. Normally Hive sets up - // URLs on the tez dag, for which Tez acquires credentials. - - // taskCredentials.addAll(getContext().getCredentials()); - - // Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() == - // taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId()); - // ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto); - // if (credentialsBinary == null) { - // credentialsBinary = serializeCredentials(getContext().getCredentials()); - // credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate()); - // } else { - // credentialsBinary = credentialsBinary.duplicate(); - // } - // builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary)); - Credentials credentials = new Credentials(); - TokenCache.setSessionToken(token, credentials); - ByteBuffer credentialsBinary = serializeCredentials(credentials); - builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary)); - - - builder.setFragmentSpec(Converters.convertTaskSpecToProto(taskSpec)); - - FragmentRuntimeInfo.Builder runtimeInfo = FragmentRuntimeInfo.newBuilder(); - runtimeInfo.setCurrentAttemptStartTime(System.currentTimeMillis()); - runtimeInfo.setWithinDagPriority(0); - runtimeInfo.setDagStartTime(submitWorkInfo.getCreationTime()); - runtimeInfo.setFirstAttemptStartTime(submitWorkInfo.getCreationTime()); - runtimeInfo.setNumSelfAndUpstreamTasks(taskSpec.getVertexParallelism()); - runtimeInfo.setNumSelfAndUpstreamCompletedTasks(0); - - - builder.setUsingTezAm(false); - builder.setFragmentRuntimeInfo(runtimeInfo.build()); - return builder.build(); - } - - private ByteBuffer serializeCredentials(Credentials credentials) throws IOException { - Credentials containerCredentials = new Credentials(); - containerCredentials.addAll(credentials); - DataOutputBuffer containerTokens_dob = new DataOutputBuffer(); - containerCredentials.writeTokenStorageToStream(containerTokens_dob); - return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength()); - } - - private static class LlapRecordReaderTaskUmbilicalExternalResponder implements LlapTaskUmbilicalExternalResponder { - protected LlapBaseRecordReader recordReader = null; - protected LinkedBlockingQueue<ReaderEvent> queuedEvents = new LinkedBlockingQueue<ReaderEvent>(); - - public LlapRecordReaderTaskUmbilicalExternalResponder() { - } - - @Override - public void submissionFailed(String fragmentId, Throwable throwable) { - try { - sendOrQueueEvent(ReaderEvent.errorEvent( - "Received submission failed event for fragment ID " + fragmentId)); - } catch (Exception err) { - LOG.error("Error during heartbeat responder:", err); - } - } - - @Override - public void heartbeat(TezHeartbeatRequest request) { - TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID(); - List<TezEvent> inEvents = request.getEvents(); - for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) { - EventType eventType = tezEvent.getEventType(); - try { - switch (eventType) { - case TASK_ATTEMPT_COMPLETED_EVENT: - sendOrQueueEvent(ReaderEvent.doneEvent()); - break; - case TASK_ATTEMPT_FAILED_EVENT: - TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) tezEvent.getEvent(); - sendOrQueueEvent(ReaderEvent.errorEvent(taskFailedEvent.getDiagnostics())); - break; - case TASK_STATUS_UPDATE_EVENT: - // If we want to handle counters - break; - default: - LOG.warn("Unhandled event type " + eventType); - break; - } - } catch (Exception err) { - LOG.error("Error during heartbeat responder:", err); - } - } - } - - @Override - public void taskKilled(TezTaskAttemptID taskAttemptId) { - try { - sendOrQueueEvent(ReaderEvent.errorEvent( - "Received task killed event for task ID " + taskAttemptId)); - } catch (Exception err) { - LOG.error("Error during heartbeat responder:", err); - } - } - - @Override - public void heartbeatTimeout(String taskAttemptId) { - try { - sendOrQueueEvent(ReaderEvent.errorEvent( - "Timed out waiting for heartbeat for task ID " + taskAttemptId)); - } catch (Exception err) { - LOG.error("Error during heartbeat responder:", err); - } - } - - public synchronized LlapBaseRecordReader getRecordReader() { - return recordReader; - } - - public synchronized void setRecordReader(LlapBaseRecordReader recordReader) { - this.recordReader = recordReader; - - if (recordReader == null) { - return; - } - - // If any events were queued by the responder, give them to the record reader now. - while (!queuedEvents.isEmpty()) { - ReaderEvent readerEvent = queuedEvents.poll(); - LOG.debug("Sending queued event to record reader: " + readerEvent.getEventType()); - recordReader.handleEvent(readerEvent); - } - } - - /** - * Send the ReaderEvents to the record reader, if it is registered to this responder. - * If there is no registered record reader, add them to a list of pending reader events - * since we don't want to drop these events. - * @param readerEvent - */ - protected synchronized void sendOrQueueEvent(ReaderEvent readerEvent) { - LlapBaseRecordReader recordReader = getRecordReader(); - if (recordReader != null) { - recordReader.handleEvent(readerEvent); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("No registered record reader, queueing event " + readerEvent.getEventType() - + " with message " + readerEvent.getMessage()); - } - - try { - queuedEvents.put(readerEvent); - } catch (Exception err) { - throw new RuntimeException("Unexpected exception while queueing reader event", err); - } - } - } - - /** - * Clear the list of queued reader events if we are not interested in sending any pending events to any registering record reader. - */ - public void clearQueuedEvents() { - queuedEvents.clear(); - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java deleted file mode 100644 index 7d06637..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java +++ /dev/null @@ -1,415 +0,0 @@ -package org.apache.hadoop.hive.llap.ext; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import com.google.common.base.Preconditions; -import org.apache.commons.collections4.ListUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; -import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; -import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy; -import org.apache.hadoop.hive.llap.tezplugins.helpers.LlapTaskUmbilicalServer; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.ProtocolSignature; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.tez.common.security.JobTokenIdentifier; -import org.apache.tez.dag.api.TezException; -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.impl.EventType; -import org.apache.tez.runtime.api.impl.TezEvent; -import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; -import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class LlapTaskUmbilicalExternalClient extends AbstractService { - - private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class); - - private final LlapProtocolClientProxy communicator; - private volatile LlapTaskUmbilicalServer llapTaskUmbilicalServer; - private final Configuration conf; - private final LlapTaskUmbilicalProtocol umbilical; - - protected final String tokenIdentifier; - protected final Token<JobTokenIdentifier> sessionToken; - - private final ConcurrentMap<String, PendingEventData> pendingEvents = new ConcurrentHashMap<>(); - private final ConcurrentMap<String, TaskHeartbeatInfo> registeredTasks= new ConcurrentHashMap<String, TaskHeartbeatInfo>(); - private LlapTaskUmbilicalExternalResponder responder = null; - private final ScheduledThreadPoolExecutor timer; - private final long connectionTimeout; - - private static class TaskHeartbeatInfo { - final String taskAttemptId; - final String hostname; - final int port; - final AtomicLong lastHeartbeat = new AtomicLong(); - - public TaskHeartbeatInfo(String taskAttemptId, String hostname, int port) { - this.taskAttemptId = taskAttemptId; - this.hostname = hostname; - this.port = port; - this.lastHeartbeat.set(System.currentTimeMillis()); - } - } - - private static class PendingEventData { - final TaskHeartbeatInfo heartbeatInfo; - final List<TezEvent> tezEvents; - - public PendingEventData(TaskHeartbeatInfo heartbeatInfo, List<TezEvent> tezEvents) { - this.heartbeatInfo = heartbeatInfo; - this.tezEvents = tezEvents; - } - } - - // TODO KKK Work out the details of the tokenIdentifier, and the session token. - // It may just be possible to create one here - since Shuffle is not involved, and this is only used - // for communication from LLAP-Daemons to the server. It will need to be sent in as part - // of the job submission request. - public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier, - Token<JobTokenIdentifier> sessionToken, LlapTaskUmbilicalExternalResponder responder) { - super(LlapTaskUmbilicalExternalClient.class.getName()); - this.conf = conf; - this.umbilical = new LlapTaskUmbilicalExternalImpl(); - this.tokenIdentifier = tokenIdentifier; - this.sessionToken = sessionToken; - this.responder = responder; - this.timer = new ScheduledThreadPoolExecutor(1); - this.connectionTimeout = HiveConf.getTimeVar(conf, - HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); - // TODO. No support for the LLAP token yet. Add support for configurable threads, however 1 should always be enough. - this.communicator = new LlapProtocolClientProxy(1, conf, null); - this.communicator.init(conf); - } - - @Override - public void serviceStart() throws IOException { - int numHandlers = HiveConf.getIntVar(conf, - HiveConf.ConfVars.LLAP_TMP_EXT_CLIENT_NUM_SERVER_HANDLERS); - llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilical, numHandlers, tokenIdentifier, sessionToken); - communicator.start(); - } - - @Override - public void serviceStop() { - llapTaskUmbilicalServer.shutdownServer(); - timer.shutdown(); - if (this.communicator != null) { - this.communicator.stop(); - } - } - - public InetSocketAddress getAddress() { - return llapTaskUmbilicalServer.getAddress(); - } - - - /** - * Submit the work for actual execution. This should always have the usingTezAm flag disabled - * @param submitWorkRequestProto - */ - public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort, List<TezEvent> tezEvents) { - Preconditions.checkArgument(submitWorkRequestProto.getUsingTezAm() == false); - - // Register the pending events to be sent for this spec. - String fragmentId = submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(); - PendingEventData pendingEventData = new PendingEventData( - new TaskHeartbeatInfo(fragmentId, llapHost, llapPort), - tezEvents); - pendingEvents.putIfAbsent(fragmentId, pendingEventData); - - // Setup timer task to check for hearbeat timeouts - timer.scheduleAtFixedRate(new HeartbeatCheckTask(), - connectionTimeout, connectionTimeout, TimeUnit.MILLISECONDS); - - // Send out the actual SubmitWorkRequest - communicator.sendSubmitWork(submitWorkRequestProto, llapHost, llapPort, - new LlapProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.SubmitWorkResponseProto>() { - - @Override - public void setResponse(LlapDaemonProtocolProtos.SubmitWorkResponseProto response) { - if (response.hasSubmissionState()) { - if (response.getSubmissionState().equals(LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) { - String msg = "Fragment: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + " rejected. Server Busy."; - LOG.info(msg); - if (responder != null) { - Throwable err = new RuntimeException(msg); - responder.submissionFailed(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), err); - } - return; - } - } - } - - @Override - public void indicateError(Throwable t) { - String msg = "Failed to submit: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(); - LOG.error(msg, t); - Throwable err = new RuntimeException(msg, t); - responder.submissionFailed(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), err); - } - }); - - - - - -// // TODO Also send out information saying that the fragment is finishable - if that is not already included in the main fragment. -// // This entire call is only required if we're doing more than scans. MRInput has no dependencies and is always finishable -// QueryIdentifierProto queryIdentifier = QueryIdentifierProto -// .newBuilder() -// .setAppIdentifier(submitWorkRequestProto.getApplicationIdString()).setDagIdentifier(submitWorkRequestProto.getFragmentSpec().getDagId()) -// .build(); -// LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto sourceStateUpdatedRequest = -// LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(queryIdentifier).setState( -// LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED). -// setSrcName(TODO) -// communicator.sendSourceStateUpdate(LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()).set); - - - } - - private void updateHeartbeatInfo(String taskAttemptId) { - int updateCount = 0; - - PendingEventData pendingEventData = pendingEvents.get(taskAttemptId); - if (pendingEventData != null) { - pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis()); - updateCount++; - } - - TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(taskAttemptId); - if (heartbeatInfo != null) { - heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis()); - updateCount++; - } - - if (updateCount == 0) { - LOG.warn("No tasks found for heartbeat from taskAttemptId " + taskAttemptId); - } - } - - private void updateHeartbeatInfo(String hostname, int port) { - int updateCount = 0; - - for (String key : pendingEvents.keySet()) { - PendingEventData pendingEventData = pendingEvents.get(key); - if (pendingEventData != null) { - if (pendingEventData.heartbeatInfo.hostname.equals(hostname) - && pendingEventData.heartbeatInfo.port == port) { - pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis()); - updateCount++; - } - } - } - - for (String key : registeredTasks.keySet()) { - TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key); - if (heartbeatInfo != null) { - if (heartbeatInfo.hostname.equals(hostname) - && heartbeatInfo.port == port) { - heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis()); - updateCount++; - } - } - } - - if (updateCount == 0) { - LOG.info("No tasks found for heartbeat from hostname " + hostname + ", port " + port); - } - } - - private class HeartbeatCheckTask implements Runnable { - public void run() { - long currentTime = System.currentTimeMillis(); - List<String> timedOutTasks = new ArrayList<String>(); - - // Check both pending and registered tasks for timeouts - for (String key : pendingEvents.keySet()) { - PendingEventData pendingEventData = pendingEvents.get(key); - if (pendingEventData != null) { - if (currentTime - pendingEventData.heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) { - timedOutTasks.add(key); - } - } - } - for (String timedOutTask : timedOutTasks) { - LOG.info("Pending taskAttemptId " + timedOutTask + " timed out"); - responder.heartbeatTimeout(timedOutTask); - pendingEvents.remove(timedOutTask); - // TODO: Do we need to tell the LLAP daemon we are no longer interested in this task? - } - - timedOutTasks.clear(); - for (String key : registeredTasks.keySet()) { - TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key); - if (heartbeatInfo != null) { - if (currentTime - heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) { - timedOutTasks.add(key); - } - } - } - for (String timedOutTask : timedOutTasks) { - LOG.info("Running taskAttemptId " + timedOutTask + " timed out"); - responder.heartbeatTimeout(timedOutTask); - registeredTasks.remove(timedOutTask); - // TODO: Do we need to tell the LLAP daemon we are no longer interested in this task? - } - } - } - - public interface LlapTaskUmbilicalExternalResponder { - void submissionFailed(String fragmentId, Throwable throwable); - void heartbeat(TezHeartbeatRequest request); - void taskKilled(TezTaskAttemptID taskAttemptId); - void heartbeatTimeout(String fragmentId); - } - - - - // TODO Ideally, the server should be shared across all client sessions running on the same node. - private class LlapTaskUmbilicalExternalImpl implements LlapTaskUmbilicalProtocol { - - @Override - public boolean canCommit(TezTaskAttemptID taskid) throws IOException { - // Expecting only a single instance of a task to be running. - return true; - } - - @Override - public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException, - TezException { - // Keep-alive information. The client should be informed and will have to take care of re-submitting the work. - // Some parts of fault tolerance go here. - - // This also provides completion information, and a possible notification when task actually starts running (first heartbeat) - - if (LOG.isDebugEnabled()) { - LOG.debug("Received heartbeat from container, request=" + request); - } - - // Incoming events can be ignored until the point when shuffle needs to be handled, instead of just scans. - TezHeartbeatResponse response = new TezHeartbeatResponse(); - - response.setLastRequestId(request.getRequestId()); - // Assuming TaskAttemptId and FragmentIdentifierString are the same. Verify this. - TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID(); - String taskAttemptIdString = taskAttemptId.toString(); - - updateHeartbeatInfo(taskAttemptIdString); - - List<TezEvent> tezEvents = null; - PendingEventData pendingEventData = pendingEvents.remove(taskAttemptIdString); - if (pendingEventData == null) { - tezEvents = Collections.emptyList(); - - // If this heartbeat was not from a pending event and it's not in our list of registered tasks, - if (!registeredTasks.containsKey(taskAttemptIdString)) { - LOG.info("Unexpected heartbeat from " + taskAttemptIdString); - response.setShouldDie(); // Do any of the other fields need to be set? - return response; - } - } else { - tezEvents = pendingEventData.tezEvents; - // Tasks removed from the pending list should then be added to the registered list. - registeredTasks.put(taskAttemptIdString, pendingEventData.heartbeatInfo); - } - - response.setLastRequestId(request.getRequestId()); - // Irrelevant from eventIds. This can be tracked in the AM itself, instead of polluting the task. - // Also since we have all the MRInput events here - they'll all be sent in together. - response.setNextFromEventId(0); // Irrelevant. See comment above. - response.setNextPreRoutedEventId(0); //Irrelevant. See comment above. - response.setEvents(tezEvents); - - List<TezEvent> inEvents = request.getEvents(); - if (LOG.isDebugEnabled()) { - LOG.debug("Heartbeat from " + taskAttemptIdString + - " events: " + (inEvents != null ? inEvents.size() : -1)); - } - for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) { - EventType eventType = tezEvent.getEventType(); - switch (eventType) { - case TASK_ATTEMPT_COMPLETED_EVENT: - LOG.debug("Task completed event for " + taskAttemptIdString); - registeredTasks.remove(taskAttemptIdString); - break; - case TASK_ATTEMPT_FAILED_EVENT: - LOG.debug("Task failed event for " + taskAttemptIdString); - registeredTasks.remove(taskAttemptIdString); - break; - case TASK_STATUS_UPDATE_EVENT: - // If we want to handle counters - LOG.debug("Task update event for " + taskAttemptIdString); - break; - default: - LOG.warn("Unhandled event type " + eventType); - break; - } - } - - // Pass the request on to the responder - try { - if (responder != null) { - responder.heartbeat(request); - } - } catch (Exception err) { - LOG.error("Error during responder execution", err); - } - - return response; - } - - @Override - public void nodeHeartbeat(Text hostname, int port) throws IOException { - updateHeartbeatInfo(hostname.toString(), port); - // No need to propagate to this to the responder - } - - @Override - public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException { - String taskAttemptIdString = taskAttemptId.toString(); - LOG.error("Task killed - " + taskAttemptIdString); - registeredTasks.remove(taskAttemptIdString); - - try { - if (responder != null) { - responder.taskKilled(taskAttemptId); - } - } catch (Exception err) { - LOG.error("Error during responder execution", err); - } - } - - @Override - public long getProtocolVersion(String protocol, long clientVersion) throws IOException { - return 0; - } - - @Override - public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, - int clientMethodsHash) throws IOException { - return ProtocolSignature.getProtocolSignature(this, protocol, - clientVersion, clientMethodsHash); - } - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java deleted file mode 100644 index dbd591a..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java +++ /dev/null @@ -1,57 +0,0 @@ -package org.apache.hadoop.hive.llap.tezplugins.helpers; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.token.Token; -import org.apache.tez.common.security.JobTokenIdentifier; -import org.apache.tez.common.security.JobTokenSecretManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class LlapTaskUmbilicalServer { - - private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalServer.class); - - protected volatile Server server; - private final InetSocketAddress address; - private final AtomicBoolean started = new AtomicBoolean(true); - - public LlapTaskUmbilicalServer(Configuration conf, LlapTaskUmbilicalProtocol umbilical, int numHandlers, String tokenIdentifier, Token<JobTokenIdentifier> token) throws - IOException { - JobTokenSecretManager jobTokenSecretManager = - new JobTokenSecretManager(); - jobTokenSecretManager.addTokenForJob(tokenIdentifier, token); - - server = new RPC.Builder(conf) - .setProtocol(LlapTaskUmbilicalProtocol.class) - .setBindAddress("0.0.0.0") - .setPort(0) - .setInstance(umbilical) - .setNumHandlers(numHandlers) - .setSecretManager(jobTokenSecretManager).build(); - - server.start(); - this.address = NetUtils.getConnectAddress(server); - LOG.info( - "Started TaskUmbilicalServer: " + umbilical.getClass().getName() + " at address: " + address + - " with numHandlers=" + numHandlers); - } - - public InetSocketAddress getAddress() { - return this.address; - } - - public void shutdownServer() { - if (started.get()) { // Primarily to avoid multiple shutdowns. - started.set(false); - server.stop(); - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 2337e89..f773d2f 100644 --- a/pom.xml +++ b/pom.xml @@ -50,6 +50,7 @@ <module>service</module> <module>llap-common</module> <module>llap-client</module> + <module>llap-ext-client</module> <module>llap-tez</module> <module>llap-server</module> <module>shims</module> http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/ql/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java deleted file mode 100644 index 7073280..0000000 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java +++ /dev/null @@ -1,205 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.DataInputStream; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.llap.Schema; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.JobConf; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class LlapBaseRecordReader<V extends WritableComparable> implements RecordReader<NullWritable, V> { - private static final Logger LOG = LoggerFactory.getLogger(LlapBaseRecordReader.class); - - DataInputStream din; - Schema schema; - Class<V> clazz; - - - protected Thread readerThread = null; - protected LinkedBlockingQueue<ReaderEvent> readerEvents = new LinkedBlockingQueue<ReaderEvent>(); - - public LlapBaseRecordReader(InputStream in, Schema schema, Class<V> clazz) { - din = new DataInputStream(in); - this.schema = schema; - this.clazz = clazz; - this.readerThread = Thread.currentThread(); - } - - public Schema getSchema() { - return schema; - } - - @Override - public void close() throws IOException { - din.close(); - } - - @Override - public long getPos() { return 0; } - - @Override - public float getProgress() { return 0f; } - - @Override - public NullWritable createKey() { - return NullWritable.get(); - } - - @Override - public V createValue() { - try { - return clazz.newInstance(); - } catch (Exception e) { - return null; - } - } - - @Override - public boolean next(NullWritable key, V value) throws IOException { - try { - // Need a way to know what thread to interrupt, since this is a blocking thread. - setReaderThread(Thread.currentThread()); - - value.readFields(din); - return true; - } catch (EOFException eof) { - // End of input. There should be a reader event available, or coming soon, so okay to be blocking call. - ReaderEvent event = getReaderEvent(); - switch (event.getEventType()) { - case DONE: - break; - default: - throw new IOException("Expected reader event with done status, but got " - + event.getEventType() + " with message " + event.getMessage()); - } - return false; - } catch (IOException io) { - if (Thread.interrupted()) { - // Either we were interrupted by one of: - // 1. handleEvent(), in which case there is a reader event waiting for us in the queue - // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming. - // Either way we should not try to block trying to read the reader events queue. - if (readerEvents.isEmpty()) { - // Case 2. - throw io; - } else { - // Case 1. Fail the reader, sending back the error we received from the reader event. - ReaderEvent event = getReaderEvent(); - switch (event.getEventType()) { - case ERROR: - throw new IOException("Received reader event error: " + event.getMessage()); - default: - throw new IOException("Got reader event type " + event.getEventType() + ", expected error event"); - } - } - } else { - // If we weren't interrupted, just propagate the error - throw io; - } - } - } - - /** - * Define success/error events which are passed to the reader from a different thread. - * The reader will check for these events on end of input and interruption of the reader thread. - */ - public static class ReaderEvent { - public enum EventType { - DONE, - ERROR - } - - protected final EventType eventType; - protected final String message; - - protected ReaderEvent(EventType type, String message) { - this.eventType = type; - this.message = message; - } - - public static ReaderEvent doneEvent() { - return new ReaderEvent(EventType.DONE, ""); - } - - public static ReaderEvent errorEvent(String message) { - return new ReaderEvent(EventType.ERROR, message); - } - - public EventType getEventType() { - return eventType; - } - - public String getMessage() { - return message; - } - } - - public void handleEvent(ReaderEvent event) { - switch (event.getEventType()) { - case DONE: - // Reader will check for the event queue upon the end of the input stream - no need to interrupt. - readerEvents.add(event); - break; - case ERROR: - readerEvents.add(event); - if (readerThread == null) { - throw new RuntimeException("Reader thread is unexpectedly null, during ReaderEvent error " + event.getMessage()); - } - // Reader is using a blocking socket .. interrupt it. - if (LOG.isDebugEnabled()) { - LOG.debug("Interrupting reader thread due to reader event with error " + event.getMessage()); - } - getReaderThread().interrupt(); - break; - default: - throw new RuntimeException("Unhandled ReaderEvent type " + event.getEventType() + " with message " + event.getMessage()); - } - } - - protected ReaderEvent getReaderEvent() { - try { - ReaderEvent event = readerEvents.take(); - return event; - } catch (InterruptedException ie) { - throw new RuntimeException("Interrupted while getting readerEvents, not expected", ie); - } - } - - protected synchronized void setReaderThread(Thread readerThread) { - this.readerThread = readerThread; - } - - protected synchronized Thread getReaderThread() { - return readerThread; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java deleted file mode 100644 index 02aedfd..0000000 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.llap; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.hive.llap.Schema; -import org.apache.hadoop.mapred.InputSplitWithLocationInfo; -import org.apache.hadoop.mapred.SplitLocationInfo; -import org.apache.thrift.TDeserializer; -import org.apache.thrift.TSerializer; - -public class LlapInputSplit implements InputSplitWithLocationInfo { - - int splitNum; - byte[] planBytes; - byte[] fragmentBytes; - SplitLocationInfo[] locations; - Schema schema; - String llapUser; - - public LlapInputSplit() { - } - - public LlapInputSplit(int splitNum, byte[] planBytes, byte[] fragmentBytes, SplitLocationInfo[] locations, Schema schema, String llapUser) { - this.planBytes = planBytes; - this.fragmentBytes = fragmentBytes; - this.locations = locations; - this.schema = schema; - this.splitNum = splitNum; - this.llapUser = llapUser; - } - - public Schema getSchema() { - return schema; - } - - @Override - public long getLength() throws IOException { - return 0; - } - - @Override - public String[] getLocations() throws IOException { - String[] locs = new String[locations.length]; - for (int i = 0; i < locations.length; ++i) { - locs[i] = locations[i].getLocation(); - } - return locs; - } - - public int getSplitNum() { - return splitNum; - } - - public byte[] getPlanBytes() { - return planBytes; - } - - public byte[] getFragmentBytes() { - return fragmentBytes; - } - - - - @Override - public void write(DataOutput out) throws IOException { - out.writeInt(splitNum); - out.writeInt(planBytes.length); - out.write(planBytes); - - out.writeInt(fragmentBytes.length); - out.write(fragmentBytes); - - out.writeInt(locations.length); - for (int i = 0; i < locations.length; ++i) { - out.writeUTF(locations[i].getLocation()); - } - - schema.write(out); - out.writeUTF(llapUser); - } - - @Override - public void readFields(DataInput in) throws IOException { - splitNum = in.readInt(); - int length = in.readInt(); - planBytes = new byte[length]; - in.readFully(planBytes); - - length = in.readInt(); - fragmentBytes = new byte[length]; - in.readFully(fragmentBytes); - - length = in.readInt(); - locations = new SplitLocationInfo[length]; - - for (int i = 0; i < length; ++i) { - locations[i] = new SplitLocationInfo(in.readUTF(), false); - } - - schema = new Schema(); - schema.readFields(in); - llapUser = in.readUTF(); - } - - @Override - public SplitLocationInfo[] getLocationInfo() throws IOException { - return locations; - } - - public String getLlapUser() { - return llapUser; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java b/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java deleted file mode 100644 index 83149ab..0000000 --- a/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java +++ /dev/null @@ -1,103 +0,0 @@ -package org.apache.hadoop.hive.llap; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.tez.common.security.JobTokenIdentifier; -import org.apache.tez.common.security.JobTokenSecretManager; -import org.apache.tez.runtime.api.impl.TaskSpec; - -public class SubmitWorkInfo implements Writable { - - private TaskSpec taskSpec; - private ApplicationId fakeAppId; - private long creationTime; - - // This is used to communicate over the LlapUmbilicalProtocol. Not related to tokens used to - // talk to LLAP daemons itself via the securit work. - private Token<JobTokenIdentifier> token; - - public SubmitWorkInfo(TaskSpec taskSpec, ApplicationId fakeAppId, long creationTime) { - this.taskSpec = taskSpec; - this.fakeAppId = fakeAppId; - this.token = createJobToken(); - this.creationTime = creationTime; - } - - // Empty constructor for writable etc. - public SubmitWorkInfo() { - } - - public TaskSpec getTaskSpec() { - return taskSpec; - } - - public ApplicationId getFakeAppId() { - return fakeAppId; - } - - public String getTokenIdentifier() { - return fakeAppId.toString(); - } - - public Token<JobTokenIdentifier> getToken() { - return token; - } - - public long getCreationTime() { - return creationTime; - } - - @Override - public void write(DataOutput out) throws IOException { - taskSpec.write(out); - out.writeLong(fakeAppId.getClusterTimestamp()); - out.writeInt(fakeAppId.getId()); - token.write(out); - out.writeLong(creationTime); - } - - @Override - public void readFields(DataInput in) throws IOException { - taskSpec = new TaskSpec(); - taskSpec.readFields(in); - long appIdTs = in.readLong(); - int appIdId = in.readInt(); - fakeAppId = ApplicationId.newInstance(appIdTs, appIdId); - token = new Token<>(); - token.readFields(in); - creationTime = in.readLong(); - } - - public static byte[] toBytes(SubmitWorkInfo submitWorkInfo) throws IOException { - DataOutputBuffer dob = new DataOutputBuffer(); - submitWorkInfo.write(dob); - return dob.getData(); - } - - public static SubmitWorkInfo fromBytes(byte[] submitWorkInfoBytes) throws IOException { - DataInputBuffer dib = new DataInputBuffer(); - dib.reset(submitWorkInfoBytes, 0, submitWorkInfoBytes.length); - SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(); - submitWorkInfo.readFields(dib); - return submitWorkInfo; - } - - - private Token<JobTokenIdentifier> createJobToken() { - String tokenIdentifier = fakeAppId.toString(); - JobTokenIdentifier identifier = new JobTokenIdentifier(new Text( - tokenIdentifier)); - Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier, - new JobTokenSecretManager()); - sessionToken.setService(identifier.getJobId()); - return sessionToken; - } -}
