HIVE-19210: Create separate module for streaming ingest (Prasanth Jayachandran 
reviewed by Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/67a8442b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/67a8442b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/67a8442b

Branch: refs/heads/branch-3
Commit: 67a8442bda1ecccd31c5dea0c3d6ba23198a742e
Parents: 473d1b9
Author: Prasanth Jayachandran <prasan...@apache.org>
Authored: Mon Apr 16 15:47:53 2018 -0700
Committer: Prasanth Jayachandran <prasan...@apache.org>
Committed: Mon Apr 16 15:47:58 2018 -0700

----------------------------------------------------------------------
 .../hive/hcatalog/streaming/package-info.java   |   19 +
 itests/hive-unit/pom.xml                        |    4 +-
 .../hive/ql/txn/compactor/TestCompactor.java    |   10 +-
 packaging/pom.xml                               |    5 +
 packaging/src/main/assembly/src.xml             |    1 +
 pom.xml                                         |    1 +
 streaming/pom.xml                               |  141 ++
 .../hive/streaming/AbstractRecordWriter.java    |  324 +++
 .../apache/hive/streaming/ConnectionError.java  |   35 +
 .../hive/streaming/DelimitedInputWriter.java    |  331 +++
 .../apache/hive/streaming/HeartBeatFailure.java |   33 +
 .../org/apache/hive/streaming/HiveEndPoint.java | 1117 +++++++++
 .../hive/streaming/ImpersonationFailed.java     |   25 +
 .../apache/hive/streaming/InvalidColumn.java    |   26 +
 .../apache/hive/streaming/InvalidPartition.java |   28 +
 .../org/apache/hive/streaming/InvalidTable.java |   38 +
 .../hive/streaming/InvalidTrasactionState.java  |   26 +
 .../hive/streaming/PartitionCreationFailed.java |   25 +
 .../hive/streaming/QueryFailedException.java    |   28 +
 .../org/apache/hive/streaming/RecordWriter.java |   43 +
 .../hive/streaming/SerializationError.java      |   26 +
 .../hive/streaming/StreamingConnection.java     |   57 +
 .../hive/streaming/StreamingException.java      |   28 +
 .../hive/streaming/StreamingIOFailure.java      |   31 +
 .../apache/hive/streaming/StrictJsonWriter.java |  162 ++
 .../hive/streaming/StrictRegexWriter.java       |  189 ++
 .../apache/hive/streaming/TransactionBatch.java |  125 +
 .../streaming/TransactionBatchUnAvailable.java  |   25 +
 .../apache/hive/streaming/TransactionError.java |   29 +
 .../java/org/apache/hive/streaming/package.html |  181 ++
 .../streaming/TestDelimitedInputWriter.java     |   73 +
 .../apache/hive/streaming/TestStreaming.java    | 2330 ++++++++++++++++++
 32 files changed, 5509 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package-info.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package-info.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package-info.java
new file mode 100644
index 0000000..36d6b13
--- /dev/null
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@Deprecated // use org.apache.hive.streaming instead
+package org.apache.hive.hcatalog.streaming;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/itests/hive-unit/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml
index f473d25..0357afd 100644
--- a/itests/hive-unit/pom.xml
+++ b/itests/hive-unit/pom.xml
@@ -76,8 +76,8 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.hive.hcatalog</groupId>
-      <artifactId>hive-hcatalog-streaming</artifactId>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-streaming</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 5966740..b19aa23 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -70,11 +70,11 @@ import 
org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hive.hcatalog.common.HCatUtil;
-import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
-import org.apache.hive.hcatalog.streaming.HiveEndPoint;
-import org.apache.hive.hcatalog.streaming.StreamingConnection;
-import org.apache.hive.hcatalog.streaming.StreamingException;
-import org.apache.hive.hcatalog.streaming.TransactionBatch;
+import org.apache.hive.streaming.DelimitedInputWriter;
+import org.apache.hive.streaming.HiveEndPoint;
+import org.apache.hive.streaming.StreamingConnection;
+import org.apache.hive.streaming.StreamingException;
+import org.apache.hive.streaming.TransactionBatch;
 import org.apache.orc.OrcConf;
 import org.junit.After;
 import org.junit.Assert;

http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/packaging/pom.xml
----------------------------------------------------------------------
diff --git a/packaging/pom.xml b/packaging/pom.xml
index 52ad6a2..8b03e2e 100644
--- a/packaging/pom.xml
+++ b/packaging/pom.xml
@@ -258,6 +258,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-streaming</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hive.hcatalog</groupId>
       <artifactId>hive-hcatalog-streaming</artifactId>
       <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/packaging/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/packaging/src/main/assembly/src.xml 
b/packaging/src/main/assembly/src.xml
index 486fe52..c477194 100644
--- a/packaging/src/main/assembly/src.xml
+++ b/packaging/src/main/assembly/src.xml
@@ -97,6 +97,7 @@
         <include>spark-client/**/*</include>
         <include>storage-api/**/*</include>
         <include>standalone-metastore/**/*</include>
+        <include>streaming/**/*</include>
         <include>testutils/**/*</include>
         <include>vector-code-gen/**/*</include>
         <include>kryo-registrator/**/*</include>

http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 76b11ef..190f74c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,6 +50,7 @@
     <module>serde</module>
     <module>service-rpc</module>
     <module>service</module>
+    <module>streaming</module>
     <module>llap-common</module>
     <module>llap-client</module>
     <module>llap-ext-client</module>

http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/streaming/pom.xml b/streaming/pom.xml
new file mode 100644
index 0000000..6da034a
--- /dev/null
+++ b/streaming/pom.xml
@@ -0,0 +1,141 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hive</groupId>
+    <artifactId>hive</artifactId>
+    <version>3.0.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>hive-streaming</artifactId>
+  <packaging>jar</packaging>
+  <name>Hive Streaming</name>
+
+  <properties>
+    <hive.path.to.root>..</hive.path.to.root>
+  </properties>
+
+  <dependencies>
+    <!-- dependencies are always listed in sorted order by groupId, artifectId 
-->
+    <!-- intra-project -->
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-serde</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-metastore</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-cli</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-hcatalog-core</artifactId>
+      <optional>true</optional>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <optional>true</optional>
+      <version>3.3.2</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <optional>true</optional>
+      <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <optional>true</optional>
+      <version>${hadoop.version}</version>
+             <exclusions>
+            <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commmons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+        </exclusions>
+    </dependency>
+
+    <!-- test -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+      <scope>test</scope>
+      <version>${hadoop.version}</version>
+             <exclusions>
+            <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commmons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+        </exclusions>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <sourceDirectory>${basedir}/src/java</sourceDirectory>
+    <testSourceDirectory>${basedir}/src/test</testSourceDirectory>
+    <resources>
+    </resources>
+    <plugins>
+      <!-- plugins are always listed in sorted order by groupId, artifectId -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java 
b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
new file mode 100644
index 0000000..25998ae
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+
+
+public abstract class AbstractRecordWriter implements RecordWriter {
+  static final private Logger LOG = 
LoggerFactory.getLogger(AbstractRecordWriter.class.getName());
+
+  private final HiveConf conf;
+  private final HiveEndPoint endPoint;
+  final Table tbl;
+
+  private final IMetaStoreClient msClient;
+  final List<Integer> bucketIds;
+  private ArrayList<RecordUpdater> updaters = null;
+
+  private final int totalBuckets;
+  /**
+   * Indicates whether target table is bucketed
+   */
+  private final boolean isBucketed;
+
+  private final Path partitionPath;
+
+  private final AcidOutputFormat<?,?> outf;
+  private Object[] bucketFieldData; // Pre-allocated in constructor. Updated 
on each write.
+  private Long curBatchMinWriteId;
+  private Long curBatchMaxWriteId;
+
+  private static final class TableWriterPair {
+    private final Table tbl;
+    private final Path partitionPath;
+    TableWriterPair(Table t, Path p) {
+      tbl = t;
+      partitionPath = p;
+    }
+  }
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link 
#AbstractRecordWriter(HiveEndPoint, HiveConf, StreamingConnection)}
+   */
+  protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf)
+    throws ConnectionError, StreamingException {
+    this(endPoint, conf, null);
+  }
+  protected AbstractRecordWriter(HiveEndPoint endPoint2, HiveConf conf, 
StreamingConnection conn)
+          throws StreamingException {
+    this.endPoint = endPoint2;
+    this.conf = conf!=null ? conf
+                : HiveEndPoint.createHiveConf(DelimitedInputWriter.class, 
endPoint.metaStoreUri);
+    try {
+      msClient = HCatUtil.getHiveMetastoreClient(this.conf);
+      UserGroupInformation ugi = conn != null ? conn.getUserGroupInformation() 
: null;
+      if (ugi == null) {
+        this.tbl = msClient.getTable(endPoint.database, endPoint.table);
+        this.partitionPath = getPathForEndPoint(msClient, endPoint);
+      } else {
+        TableWriterPair twp = ugi.doAs(
+          new PrivilegedExceptionAction<TableWriterPair>() {
+            @Override
+            public TableWriterPair run() throws Exception {
+              return new TableWriterPair(msClient.getTable(endPoint.database, 
endPoint.table),
+                getPathForEndPoint(msClient, endPoint));
+            }
+          });
+        this.tbl = twp.tbl;
+        this.partitionPath = twp.partitionPath;
+      }
+      this.isBucketed = tbl.getSd().getNumBuckets() > 0;
+      /**
+       *  For unbucketed tables we have exactly 1 RecrodUpdater for each 
AbstractRecordWriter which
+       *  ends up writing to a file bucket_000000
+       * See also {@link #getBucket(Object)}
+       */
+      this.totalBuckets = isBucketed ? tbl.getSd().getNumBuckets() : 1;
+      if(isBucketed) {
+        this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), 
tbl.getSd().getCols());
+        this.bucketFieldData = new Object[bucketIds.size()];
+      }
+      else {
+        bucketIds = Collections.emptyList();
+      }
+      String outFormatName = this.tbl.getSd().getOutputFormat();
+      outf = (AcidOutputFormat<?, ?>) 
ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf);
+    } catch(InterruptedException e) {
+      throw new StreamingException(endPoint2.toString(), e);
+    } catch (MetaException | NoSuchObjectException e) {
+      throw new ConnectionError(endPoint2, e);
+    } catch (TException | ClassNotFoundException | IOException e) {
+      throw new StreamingException(e.getMessage(), e);
+    }
+  }
+
+  /**
+   * used to tag error msgs to provied some breadcrumbs
+   */
+  String getWatermark() {
+    return partitionPath + " writeIds[" + curBatchMinWriteId + "," + 
curBatchMaxWriteId + "]";
+  }
+  // return the column numbers of the bucketed columns
+  private List<Integer> getBucketColIDs(List<String> bucketCols, 
List<FieldSchema> cols) {
+    ArrayList<Integer> result =  new ArrayList<Integer>(bucketCols.size());
+    HashSet<String> bucketSet = new HashSet<String>(bucketCols);
+    for (int i = 0; i < cols.size(); i++) {
+      if( bucketSet.contains(cols.get(i).getName()) ) {
+        result.add(i);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Get the SerDe for the Objects created by {@link #encode}.  This is public 
so that test
+   * frameworks can use it.
+   * @return serde
+   * @throws SerializationError
+   */
+  public abstract AbstractSerDe getSerde() throws SerializationError;
+
+  /**
+   * Encode a record as an Object that Hive can read with the ObjectInspector 
associated with the
+   * serde returned by {@link #getSerde}.  This is public so that test 
frameworks can use it.
+   * @param record record to be deserialized
+   * @return deserialized record as an Object
+   * @throws SerializationError
+   */
+  public abstract Object encode(byte[] record) throws SerializationError;
+
+  protected abstract ObjectInspector[] getBucketObjectInspectors();
+  protected abstract StructObjectInspector getRecordObjectInspector();
+  protected abstract StructField[] getBucketStructFields();
+
+  // returns the bucket number to which the record belongs to
+  protected int getBucket(Object row) throws SerializationError {
+    if(!isBucketed) {
+      return 0;
+    }
+    ObjectInspector[] inspectors = getBucketObjectInspectors();
+    Object[] bucketFields = getBucketFields(row);
+    return ObjectInspectorUtils.getBucketNumber(bucketFields, inspectors, 
totalBuckets);
+  }
+
+  @Override
+  public void flush() throws StreamingIOFailure {
+    try {
+      for (RecordUpdater updater : updaters) {
+        if (updater != null) {
+          updater.flush();
+        }
+      }
+    } catch (IOException e) {
+      throw new StreamingIOFailure("Unable to flush recordUpdater", e);
+    }
+  }
+
+  @Override
+  public void clear() throws StreamingIOFailure {
+  }
+
+  /**
+   * Creates a new record updater for the new batch
+   * @param minWriteId smallest writeid in the batch
+   * @param maxWriteID largest writeid in the batch
+   * @throws StreamingIOFailure if failed to create record updater
+   */
+  @Override
+  public void newBatch(Long minWriteId, Long maxWriteID)
+          throws StreamingIOFailure, SerializationError {
+    curBatchMinWriteId = minWriteId;
+    curBatchMaxWriteId = maxWriteID;
+    updaters = new ArrayList<RecordUpdater>(totalBuckets);
+    for (int bucket = 0; bucket < totalBuckets; bucket++) {
+      updaters.add(bucket, null);//so that get(i) returns null rather than 
ArrayOutOfBounds
+    }
+  }
+
+  @Override
+  public void closeBatch() throws StreamingIOFailure {
+    boolean haveError = false;
+    for (RecordUpdater updater : updaters) {
+      if (updater != null) {
+        try {
+          //try not to leave any files open
+          updater.close(false);
+        } catch (Exception ex) {
+          haveError = true;
+          LOG.error("Unable to close " + updater + " due to: " + 
ex.getMessage(), ex);
+        }
+      }
+    }
+    updaters.clear();
+    if(haveError) {
+      throw new StreamingIOFailure("Encountered errors while closing (see 
logs) " + getWatermark());
+    }
+  }
+
+  protected static ObjectInspector[] 
getObjectInspectorsForBucketedCols(List<Integer> bucketIds
+          , StructObjectInspector recordObjInspector)
+          throws SerializationError {
+    ObjectInspector[] result = new ObjectInspector[bucketIds.size()];
+
+    for (int i = 0; i < bucketIds.size(); i++) {
+      int bucketId = bucketIds.get(i);
+      result[i] =
+              recordObjInspector.getAllStructFieldRefs().get( bucketId 
).getFieldObjectInspector();
+    }
+    return result;
+  }
+
+
+  private Object[] getBucketFields(Object row) throws SerializationError {
+    StructObjectInspector recordObjInspector = getRecordObjectInspector();
+    StructField[] bucketStructFields = getBucketStructFields();
+    for (int i = 0; i < bucketIds.size(); i++) {
+      bucketFieldData[i] = recordObjInspector.getStructFieldData(row,  
bucketStructFields[i]);
+    }
+    return bucketFieldData;
+  }
+
+  private RecordUpdater createRecordUpdater(int bucketId, Long minWriteId, 
Long maxWriteID)
+          throws IOException, SerializationError {
+    try {
+      // Initialize table properties from the table parameters. This is 
required because the table
+      // may define certain table parameters that may be required while 
writing. The table parameter
+      // 'transactional_properties' is one such example.
+      Properties tblProperties = new Properties();
+      tblProperties.putAll(tbl.getParameters());
+      return  outf.getRecordUpdater(partitionPath,
+              new AcidOutputFormat.Options(conf)
+                      .inspector(getSerde().getObjectInspector())
+                      .bucket(bucketId)
+                      .tableProperties(tblProperties)
+                      .minimumWriteId(minWriteId)
+                      .maximumWriteId(maxWriteID)
+                      .statementId(-1)
+                      .finalDestination(partitionPath));
+    } catch (SerDeException e) {
+      throw new SerializationError("Failed to get object inspector from Serde "
+              + getSerde().getClass().getName(), e);
+    }
+  }
+
+  RecordUpdater getRecordUpdater(int bucketId) throws StreamingIOFailure, 
SerializationError {
+    RecordUpdater recordUpdater = updaters.get(bucketId);
+    if (recordUpdater == null) {
+      try {
+        recordUpdater = createRecordUpdater(bucketId, curBatchMinWriteId, 
curBatchMaxWriteId);
+      } catch (IOException e) {
+        String errMsg = "Failed creating RecordUpdater for " + getWatermark();
+        LOG.error(errMsg, e);
+        throw new StreamingIOFailure(errMsg, e);
+      }
+      updaters.set(bucketId, recordUpdater);
+    }
+    return recordUpdater;
+  }
+
+  private Path getPathForEndPoint(IMetaStoreClient msClient, HiveEndPoint 
endPoint)
+          throws StreamingException {
+    try {
+      String location;
+      if(endPoint.partitionVals==null || endPoint.partitionVals.isEmpty() ) {
+        location = msClient.getTable(endPoint.database,endPoint.table)
+                .getSd().getLocation();
+      } else {
+        location = msClient.getPartition(endPoint.database, endPoint.table,
+                endPoint.partitionVals).getSd().getLocation();
+      }
+      return new Path(location);
+    } catch (TException e) {
+      throw new StreamingException(e.getMessage()
+              + ". Unable to get path for end point: "
+              + endPoint.partitionVals, e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/streaming/src/java/org/apache/hive/streaming/ConnectionError.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/ConnectionError.java 
b/streaming/src/java/org/apache/hive/streaming/ConnectionError.java
new file mode 100644
index 0000000..668bffb
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/ConnectionError.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class ConnectionError extends StreamingException {
+
+  public ConnectionError(String msg) {
+    super(msg);
+  }
+
+  public ConnectionError(String msg, Exception innerEx) {
+    super(msg, innerEx);
+  }
+
+  public ConnectionError(HiveEndPoint endPoint, Exception innerEx) {
+    super("Error connecting to " + endPoint +
+        (innerEx == null ? "" : ": " + innerEx.getMessage()), innerEx);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java 
b/streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java
new file mode 100644
index 0000000..898b3f9
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import 
org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.BytesWritable;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Streaming Writer handles delimited input (eg. CSV).
+ * Delimited input is parsed & reordered to match column order in table
+ * Uses Lazy Simple Serde to process delimited input
+ */
+public class DelimitedInputWriter extends AbstractRecordWriter {
+  private final boolean reorderingNeeded;
+  private String delimiter;
+  private char serdeSeparator;
+  private int[] fieldToColMapping;
+  private final ArrayList<String> tableColumns;
+  private LazySimpleSerDe serde = null;
+
+  private final LazySimpleStructObjectInspector recordObjInspector;
+  private final ObjectInspector[] bucketObjInspectors;
+  private final StructField[] bucketStructFields;
+
+  static final private Logger LOG = 
LoggerFactory.getLogger(DelimitedInputWriter.class.getName());
+
+  /** Constructor. Uses default separator of the LazySimpleSerde
+   * @param colNamesForFields Column name assignment for input fields. nulls 
or empty
+   *                          strings in the array indicates the fields to be 
skipped
+   * @param delimiter input field delimiter
+   * @param endPoint Hive endpoint
+   * @throws ConnectionError Problem talking to Hive
+   * @throws ClassNotFoundException Serde class not found
+   * @throws SerializationError Serde initialization/interaction failed
+   * @throws StreamingException Problem acquiring file system path for 
partition
+   * @throws InvalidColumn any element in colNamesForFields refers to a non 
existing column
+   */
+  public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+                              HiveEndPoint endPoint, StreamingConnection conn)
+    throws ClassNotFoundException, ConnectionError, SerializationError,
+      InvalidColumn, StreamingException {
+    this(colNamesForFields, delimiter, endPoint, null, conn);
+  }
+ /** Constructor. Uses default separator of the LazySimpleSerde
+  * @param colNamesForFields Column name assignment for input fields. nulls or 
empty
+  *                          strings in the array indicates the fields to be 
skipped
+  * @param delimiter input field delimiter
+  * @param endPoint Hive endpoint
+  * @param conf a Hive conf object. Can be null if not using advanced hive 
settings.
+  * @throws ConnectionError Problem talking to Hive
+  * @throws ClassNotFoundException Serde class not found
+  * @throws SerializationError Serde initialization/interaction failed
+  * @throws StreamingException Problem acquiring file system path for partition
+  * @throws InvalidColumn any element in colNamesForFields refers to a non 
existing column
+  */
+   public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+                              HiveEndPoint endPoint, HiveConf conf, 
StreamingConnection conn)
+          throws ClassNotFoundException, ConnectionError, SerializationError,
+                 InvalidColumn, StreamingException {
+     this(colNamesForFields, delimiter, endPoint, conf,
+       (char) LazySerDeParameters.DefaultSeparators[0], conn);
+   }
+  /**
+   * Constructor. Allows overriding separator of the LazySimpleSerde
+   * @param colNamesForFields Column name assignment for input fields
+   * @param delimiter input field delimiter
+   * @param endPoint Hive endpoint
+   * @param conf a Hive conf object. Set to null if not using advanced hive 
settings.
+   * @param serdeSeparator separator used when encoding data that is fed into 
the
+   *                             LazySimpleSerde. Ensure this separator does 
not occur
+   *                             in the field data
+   * @param conn connection this Writer is to be used with
+   * @throws ConnectionError Problem talking to Hive
+   * @throws ClassNotFoundException Serde class not found
+   * @throws SerializationError Serde initialization/interaction failed
+   * @throws StreamingException Problem acquiring file system path for 
partition
+   * @throws InvalidColumn any element in colNamesForFields refers to a non 
existing column
+   */
+  public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+                              HiveEndPoint endPoint, HiveConf conf, char 
serdeSeparator, StreamingConnection conn)
+          throws ClassNotFoundException, ConnectionError, SerializationError,
+                 InvalidColumn, StreamingException {
+    super(endPoint, conf, conn);
+    this.tableColumns = getCols(tbl);
+    this.serdeSeparator = serdeSeparator;
+    this.delimiter = delimiter;
+    this.fieldToColMapping = getFieldReordering(colNamesForFields, 
getTableColumns());
+    this.reorderingNeeded = isReorderingNeeded(delimiter, getTableColumns());
+    LOG.debug("Field reordering needed = " + this.reorderingNeeded + ", for 
endpoint " + endPoint);
+    this.serdeSeparator = serdeSeparator;
+    this.serde = createSerde(tbl, conf, serdeSeparator);
+
+    // get ObjInspectors for entire record and bucketed cols
+    try {
+      this.recordObjInspector = (LazySimpleStructObjectInspector) 
serde.getObjectInspector();
+      this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, 
recordObjInspector);
+    } catch (SerDeException e) {
+      throw new SerializationError("Unable to get ObjectInspector for bucket 
columns", e);
+    }
+
+    // get StructFields for bucketed cols
+    bucketStructFields = new StructField[bucketIds.size()];
+    List<? extends StructField> allFields = 
recordObjInspector.getAllStructFieldRefs();
+    for (int i = 0; i < bucketIds.size(); i++) {
+      bucketStructFields[i] = allFields.get(bucketIds.get(i));
+    }
+  }
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link 
#DelimitedInputWriter(String[], String, HiveEndPoint, StreamingConnection)}
+   */
+  public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+                              HiveEndPoint endPoint)
+    throws ClassNotFoundException, ConnectionError, SerializationError,
+    InvalidColumn, StreamingException {
+    this(colNamesForFields, delimiter, endPoint, null, null);
+  }
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link 
#DelimitedInputWriter(String[], String, HiveEndPoint, HiveConf, 
StreamingConnection)}
+   */
+  public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+                              HiveEndPoint endPoint, HiveConf conf)
+    throws ClassNotFoundException, ConnectionError, SerializationError,
+    InvalidColumn, StreamingException {
+    this(colNamesForFields, delimiter, endPoint, conf,
+      (char) LazySerDeParameters.DefaultSeparators[0], null);
+  }
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link 
#DelimitedInputWriter(String[], String, HiveEndPoint, HiveConf, char, 
StreamingConnection)}
+   */
+  public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+                              HiveEndPoint endPoint, HiveConf conf, char 
serdeSeparator)
+    throws ClassNotFoundException, StreamingException {
+    this(colNamesForFields, delimiter, endPoint, conf, serdeSeparator, null);
+  }
+
+  private boolean isReorderingNeeded(String delimiter, ArrayList<String> 
tableColumns) {
+    return !( delimiter.equals(String.valueOf(getSerdeSeparator()))
+            && areFieldsInColOrder(fieldToColMapping)
+            && tableColumns.size()>=fieldToColMapping.length );
+  }
+
+  private static boolean areFieldsInColOrder(int[] fieldToColMapping) {
+    for(int i=0; i<fieldToColMapping.length; ++i) {
+      if(fieldToColMapping[i]!=i) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @VisibleForTesting
+  static int[] getFieldReordering(String[] colNamesForFields, List<String> 
tableColNames)
+          throws InvalidColumn {
+    int[] result = new int[ colNamesForFields.length ];
+    for(int i=0; i<colNamesForFields.length; ++i) {
+      result[i] = -1;
+    }
+    int i=-1, fieldLabelCount=0;
+    for( String col : colNamesForFields ) {
+      ++i;
+      if(col == null) {
+        continue;
+      }
+      if( col.trim().isEmpty() ) {
+        continue;
+      }
+      ++fieldLabelCount;
+      int loc = tableColNames.indexOf(col);
+      if(loc == -1) {
+        throw new InvalidColumn("Column '" + col + "' not found in table for 
input field " + i+1);
+      }
+      result[i] = loc;
+    }
+    if(fieldLabelCount>tableColNames.size()) {
+      throw new InvalidColumn("Number of field names exceeds the number of 
columns in table");
+    }
+    return result;
+  }
+
+  // Reorder fields in record based on the order of columns in the table
+  protected byte[] reorderFields(byte[] record) throws 
UnsupportedEncodingException {
+    if(!reorderingNeeded) {
+      return record;
+    }
+    String[] reorderedFields = new String[getTableColumns().size()];
+    String decoded = new String(record);
+    String[] fields = decoded.split(delimiter,-1);
+    for (int i=0; i<fieldToColMapping.length; ++i) {
+      int newIndex = fieldToColMapping[i];
+      if(newIndex != -1) {
+        reorderedFields[newIndex] = fields[i];
+      }
+    }
+    return join(reorderedFields, getSerdeSeparator());
+  }
+
+  // handles nulls in items[]
+  // TODO: perhaps can be made more efficient by creating a byte[] directly
+  private static byte[] join(String[] items, char separator) {
+    StringBuilder buff = new StringBuilder(100);
+    if(items.length == 0)
+      return "".getBytes();
+    int i=0;
+    for(; i<items.length-1; ++i) {
+      if(items[i]!=null) {
+        buff.append(items[i]);
+      }
+      buff.append(separator);
+    }
+    if(items[i]!=null) {
+      buff.append(items[i]);
+    }
+    return buff.toString().getBytes();
+  }
+
+  protected ArrayList<String> getTableColumns() {
+    return tableColumns;
+  }
+
+  @Override
+  public void write(long writeId, byte[] record)
+          throws SerializationError, StreamingIOFailure {
+    try {
+      byte[] orderedFields = reorderFields(record);
+      Object encodedRow = encode(orderedFields);
+      int bucket = getBucket(encodedRow);
+      getRecordUpdater(bucket).insert(writeId, encodedRow);
+    } catch (IOException e) {
+      throw new StreamingIOFailure("Error writing record in transaction write 
id ("
+              + writeId + ")", e);
+    }
+  }
+
+  @Override
+  public AbstractSerDe getSerde() {
+    return serde;
+  }
+
+  protected LazySimpleStructObjectInspector getRecordObjectInspector() {
+    return recordObjInspector;
+  }
+
+  @Override
+  protected StructField[] getBucketStructFields() {
+    return bucketStructFields;
+  }
+
+  protected ObjectInspector[] getBucketObjectInspectors() {
+    return bucketObjInspectors;
+  }
+
+  @Override
+  public Object encode(byte[] record) throws SerializationError {
+    try {
+      BytesWritable blob = new BytesWritable();
+      blob.set(record, 0, record.length);
+      return serde.deserialize(blob);
+    } catch (SerDeException e) {
+      throw new SerializationError("Unable to convert byte[] record into 
Object", e);
+    }
+  }
+
+  /**
+   * Creates LazySimpleSerde
+   * @return
+   * @throws SerializationError if serde could not be initialized
+   * @param tbl
+   */
+  protected static LazySimpleSerDe createSerde(Table tbl, HiveConf conf, char 
serdeSeparator)
+          throws SerializationError {
+    try {
+      Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+      tableProps.setProperty("field.delim", String.valueOf(serdeSeparator));
+      LazySimpleSerDe serde = new LazySimpleSerDe();
+      SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
+      return serde;
+    } catch (SerDeException e) {
+      throw new SerializationError("Error initializing serde", e);
+    }
+  }
+
+  private ArrayList<String> getCols(Table table) {
+    List<FieldSchema> cols = table.getSd().getCols();
+    ArrayList<String> colNames = new ArrayList<String>(cols.size());
+    for (FieldSchema col : cols) {
+      colNames.add(col.getName().toLowerCase());
+    }
+    return  colNames;
+  }
+
+  public char getSerdeSeparator() {
+    return serdeSeparator;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/67a8442b/streaming/src/java/org/apache/hive/streaming/HeartBeatFailure.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/HeartBeatFailure.java 
b/streaming/src/java/org/apache/hive/streaming/HeartBeatFailure.java
new file mode 100644
index 0000000..b1f9520
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/HeartBeatFailure.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import java.util.Collection;
+import java.util.Set;
+
+public class HeartBeatFailure extends StreamingException {
+  private Collection<Long> abortedTxns;
+  private Collection<Long> nosuchTxns;
+
+  public HeartBeatFailure(Collection<Long> abortedTxns, Set<Long> nosuchTxns) {
+    super("Heart beat error. InvalidTxns: " + nosuchTxns + ". AbortedTxns: " + 
abortedTxns);
+    this.abortedTxns = abortedTxns;
+    this.nosuchTxns = nosuchTxns;
+  }
+}

Reply via email to