http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java 
b/streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java
new file mode 100644
index 0000000..b04e137
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java
@@ -0,0 +1,1117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Information about the hive end point (i.e. table or partition) to write to.
+ * A light weight object that does NOT internally hold on to resources such as
+ * network connections. It can be stored in Hashed containers such as sets and 
hash tables.
+ */
+public class HiveEndPoint {
+  public final String metaStoreUri;
+  public final String database;
+  public final String table;
+  public final ArrayList<String> partitionVals;
+
+
+  static final private Logger LOG = 
LoggerFactory.getLogger(HiveEndPoint.class.getName());
+
+  /**
+   *
+   * @param metaStoreUri   URI of the metastore to connect to eg: 
thrift://localhost:9083
+   * @param database       Name of the Hive database
+   * @param table          Name of table to stream to
+   * @param partitionVals  Indicates the specific partition to stream to. Can 
be null or empty List
+   *                       if streaming to a table without partitions. The 
order of values in this
+   *                       list must correspond exactly to the order of 
partition columns specified
+   *                       during the table creation. E.g. For a table 
partitioned by
+   *                       (continent string, country string), partitionVals 
could be the list
+   *                       ("Asia", "India").
+   */
+  public HiveEndPoint(String metaStoreUri
+          , String database, String table, List<String> partitionVals) {
+    this.metaStoreUri = metaStoreUri;
+    if (database==null) {
+      throw new IllegalArgumentException("Database cannot be null for 
HiveEndPoint");
+    }
+    this.database = database;
+    this.table = table;
+    if (table==null) {
+      throw new IllegalArgumentException("Table cannot be null for 
HiveEndPoint");
+    }
+    this.partitionVals = partitionVals==null ? new ArrayList<String>()
+                                             : new ArrayList<String>( 
partitionVals );
+  }
+
+
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link 
#newConnection(boolean, String)}
+   */
+  @Deprecated
+  public StreamingConnection newConnection(final boolean createPartIfNotExists)
+    throws ConnectionError, InvalidPartition, InvalidTable, 
PartitionCreationFailed
+    , ImpersonationFailed , InterruptedException {
+    return newConnection(createPartIfNotExists, null, null, null);
+  }
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link 
#newConnection(boolean, HiveConf, String)}
+   */
+  @Deprecated
+  public StreamingConnection newConnection(final boolean 
createPartIfNotExists, HiveConf conf)
+    throws ConnectionError, InvalidPartition, InvalidTable, 
PartitionCreationFailed
+    , ImpersonationFailed , InterruptedException {
+    return newConnection(createPartIfNotExists, conf, null, null);
+  }
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link 
#newConnection(boolean, HiveConf, UserGroupInformation, String)}
+   */
+  @Deprecated
+  public StreamingConnection newConnection(final boolean 
createPartIfNotExists, final HiveConf conf,
+                                           final UserGroupInformation 
authenticatedUser)
+    throws ConnectionError, InvalidPartition,
+    InvalidTable, PartitionCreationFailed, ImpersonationFailed , 
InterruptedException {
+    return newConnection(createPartIfNotExists, conf, authenticatedUser, null);
+  }
+  /**
+   * Acquire a new connection to MetaStore for streaming
+   * @param createPartIfNotExists If true, the partition specified in the 
endpoint
+   *                              will be auto created if it does not exist
+   * @param agentInfo should uniquely identify the process/entity that is 
using this batch.  This
+   *                  should be something that can be correlated with calling 
application log files
+   *                  and/or monitoring consoles.
+   * @return
+   * @throws ConnectionError if problem connecting
+   * @throws InvalidPartition  if specified partition is not valid 
(createPartIfNotExists = false)
+   * @throws ImpersonationFailed  if not able to impersonate 'proxyUser'
+   * @throws PartitionCreationFailed if failed to create partition
+   * @throws InterruptedException
+   */
+  public StreamingConnection newConnection(final boolean 
createPartIfNotExists, String agentInfo)
+    throws ConnectionError, InvalidPartition, InvalidTable, 
PartitionCreationFailed
+    , ImpersonationFailed , InterruptedException {
+    return newConnection(createPartIfNotExists, null, null, agentInfo);
+  }
+
+  /**
+   * Acquire a new connection to MetaStore for streaming
+   * @param createPartIfNotExists If true, the partition specified in the 
endpoint
+   *                              will be auto created if it does not exist
+   * @param conf HiveConf object, set it to null if not using advanced hive 
settings.
+   * @param agentInfo should uniquely identify the process/entity that is 
using this batch.  This
+   *                  should be something that can be correlated with calling 
application log files
+   *                  and/or monitoring consoles.
+   * @return
+   * @throws ConnectionError if problem connecting
+   * @throws InvalidPartition  if specified partition is not valid 
(createPartIfNotExists = false)
+   * @throws ImpersonationFailed  if not able to impersonate 'proxyUser'
+   * @throws PartitionCreationFailed if failed to create partition
+   * @throws InterruptedException
+   */
+  public StreamingConnection newConnection(final boolean 
createPartIfNotExists, HiveConf conf, String agentInfo)
+          throws ConnectionError, InvalidPartition, InvalidTable, 
PartitionCreationFailed
+          , ImpersonationFailed , InterruptedException {
+    return newConnection(createPartIfNotExists, conf, null, agentInfo);
+  }
+
+  /**
+   * Acquire a new connection to MetaStore for streaming. To connect using 
Kerberos,
+   *   'authenticatedUser' argument should have been used to do a kerberos 
login.  Additionally the
+   *   'hive.metastore.kerberos.principal' setting should be set correctly 
either in hive-site.xml or
+   *    in the 'conf' argument (if not null). If using hive-site.xml, it 
should be in classpath.
+   *
+   * @param createPartIfNotExists If true, the partition specified in the 
endpoint
+   *                              will be auto created if it does not exist
+   * @param conf               HiveConf object to be used for the connection. 
Can be null.
+   * @param authenticatedUser  UserGroupInformation object obtained from 
successful authentication.
+   *                           Uses non-secure mode if this argument is null.
+   * @param agentInfo should uniquely identify the process/entity that is 
using this batch.  This
+   *                  should be something that can be correlated with calling 
application log files
+   *                  and/or monitoring consoles.
+   * @return
+   * @throws ConnectionError if there is a connection problem
+   * @throws InvalidPartition  if specified partition is not valid 
(createPartIfNotExists = false)
+   * @throws ImpersonationFailed  if not able to impersonate 'username'
+   * @throws PartitionCreationFailed if failed to create partition
+   * @throws InterruptedException
+   */
+  public StreamingConnection newConnection(final boolean 
createPartIfNotExists, final HiveConf conf,
+                                           final UserGroupInformation 
authenticatedUser, final String agentInfo)
+          throws ConnectionError, InvalidPartition,
+               InvalidTable, PartitionCreationFailed, ImpersonationFailed , 
InterruptedException {
+
+    if( authenticatedUser==null ) {
+      return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf, 
agentInfo);
+    }
+
+    try {
+      return authenticatedUser.doAs (
+             new PrivilegedExceptionAction<StreamingConnection>() {
+                @Override
+                public StreamingConnection run()
+                        throws ConnectionError, InvalidPartition, InvalidTable
+                        , PartitionCreationFailed {
+                  return newConnectionImpl(authenticatedUser, 
createPartIfNotExists, conf, agentInfo);
+                }
+             }
+      );
+    } catch (IOException e) {
+      throw new ConnectionError("Failed to connect as : " + 
authenticatedUser.getShortUserName(), e);
+    }
+  }
+
+  private StreamingConnection newConnectionImpl(UserGroupInformation ugi,
+                                               boolean createPartIfNotExists, 
HiveConf conf, String agentInfo)
+          throws ConnectionError, InvalidPartition, InvalidTable
+          , PartitionCreationFailed {
+    return new ConnectionImpl(this, ugi, conf, createPartIfNotExists, 
agentInfo);
+  }
+
+  private static UserGroupInformation getUserGroupInfo(String user)
+          throws ImpersonationFailed {
+    try {
+      return UserGroupInformation.createProxyUser(
+              user, UserGroupInformation.getLoginUser());
+    } catch (IOException e) {
+      LOG.error("Unable to get UserGroupInfo for user : " + user, e);
+      throw new ImpersonationFailed(user,e);
+    }
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    HiveEndPoint endPoint = (HiveEndPoint) o;
+
+    if (database != null
+            ? !database.equals(endPoint.database)
+            : endPoint.database != null ) {
+      return false;
+    }
+    if (metaStoreUri != null
+            ? !metaStoreUri.equals(endPoint.metaStoreUri)
+            : endPoint.metaStoreUri != null ) {
+      return false;
+    }
+    if (!partitionVals.equals(endPoint.partitionVals)) {
+      return false;
+    }
+    if (table != null ? !table.equals(endPoint.table) : endPoint.table != 
null) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = metaStoreUri != null ? metaStoreUri.hashCode() : 0;
+    result = 31 * result + (database != null ? database.hashCode() : 0);
+    result = 31 * result + (table != null ? table.hashCode() : 0);
+    result = 31 * result + partitionVals.hashCode();
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "{" +
+            "metaStoreUri='" + metaStoreUri + '\'' +
+            ", database='" + database + '\'' +
+            ", table='" + table + '\'' +
+            ", partitionVals=" + partitionVals + " }";
+  }
+
+
+  private static class ConnectionImpl implements StreamingConnection {
+    private final IMetaStoreClient msClient;
+    private final IMetaStoreClient heartbeaterMSClient;
+    private final HiveEndPoint endPt;
+    private final UserGroupInformation ugi;
+    private final String username;
+    private final boolean secureMode;
+    private final String agentInfo;
+
+    /**
+     * @param endPoint end point to connect to
+     * @param ugi on behalf of whom streaming is done. cannot be null
+     * @param conf HiveConf object
+     * @param createPart create the partition if it does not exist
+     * @throws ConnectionError if there is trouble connecting
+     * @throws InvalidPartition if specified partition does not exist (and 
createPart=false)
+     * @throws InvalidTable if specified table does not exist
+     * @throws PartitionCreationFailed if createPart=true and not able to 
create partition
+     */
+    private ConnectionImpl(HiveEndPoint endPoint, UserGroupInformation ugi,
+                           HiveConf conf, boolean createPart, String agentInfo)
+            throws ConnectionError, InvalidPartition, InvalidTable
+                   , PartitionCreationFailed {
+      this.endPt = endPoint;
+      this.ugi = ugi;
+      this.agentInfo = agentInfo;
+      this.username = ugi==null ? System.getProperty("user.name") : 
ugi.getShortUserName();
+      if (conf==null) {
+        conf = HiveEndPoint.createHiveConf(this.getClass(), 
endPoint.metaStoreUri);
+      }
+      else {
+          overrideConfSettings(conf);
+      }
+      this.secureMode = ugi==null ? false : ugi.hasKerberosCredentials();
+      this.msClient = getMetaStoreClient(endPoint, conf, secureMode);
+      // We use a separate metastore client for heartbeat calls to ensure 
heartbeat RPC calls are
+      // isolated from the other transaction related RPC calls.
+      this.heartbeaterMSClient = getMetaStoreClient(endPoint, conf, 
secureMode);
+      checkEndPoint(endPoint, msClient);
+      if (createPart  &&  !endPoint.partitionVals.isEmpty()) {
+        createPartitionIfNotExists(endPoint, msClient, conf);
+      }
+    }
+
+    /**
+     * Checks the validity of endpoint
+     * @param endPoint the HiveEndPoint to be checked
+     * @param msClient the metastore client
+     * @throws InvalidTable
+     */
+    private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient 
msClient)
+        throws InvalidTable, ConnectionError {
+      Table t;
+      try {
+        t = msClient.getTable(endPoint.database, endPoint.table);
+      } catch (Exception e) {
+        LOG.warn("Unable to check the endPoint: " + endPoint, e);
+        throw new InvalidTable(endPoint.database, endPoint.table, e);
+      }
+      // 1 - check that the table is Acid
+      if (!AcidUtils.isFullAcidTable(t)) {
+        LOG.error("HiveEndPoint " + endPoint + " must use an acid table");
+        throw new InvalidTable(endPoint.database, endPoint.table, "is not an 
Acid table");
+      }
+
+      // 2 - check if partitionvals are legitimate
+      if (t.getPartitionKeys() != null && !t.getPartitionKeys().isEmpty()
+          && endPoint.partitionVals.isEmpty()) {
+        // Invalid if table is partitioned, but endPoint's partitionVals is 
empty
+        String errMsg = "HiveEndPoint " + endPoint + " doesn't specify any 
partitions for " +
+            "partitioned table";
+        LOG.error(errMsg);
+        throw new ConnectionError(errMsg);
+      }
+      if ((t.getPartitionKeys() == null || t.getPartitionKeys().isEmpty())
+          && !endPoint.partitionVals.isEmpty()) {
+        // Invalid if table is not partitioned, but endPoint's partitionVals 
is not empty
+        String errMsg = "HiveEndPoint" + endPoint + " specifies partitions for 
unpartitioned table";
+        LOG.error(errMsg);
+        throw new ConnectionError(errMsg);
+      }
+    }
+
+    /**
+     * Close connection
+     */
+    @Override
+    public void close() {
+      if (ugi==null) {
+        msClient.close();
+        heartbeaterMSClient.close();
+        return;
+      }
+      try {
+        ugi.doAs (
+            new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws Exception {
+                msClient.close();
+                heartbeaterMSClient.close();
+                return null;
+              }
+            } );
+        try {
+          FileSystem.closeAllForUGI(ugi);
+        } catch (IOException exception) {
+          LOG.error("Could not clean up file-system handles for UGI: " + ugi, 
exception);
+        }
+      } catch (IOException e) {
+        LOG.error("Error closing connection to " + endPt, e);
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted when closing connection to " + endPt, e);
+      }
+    }
+
+    @Override
+    public UserGroupInformation getUserGroupInformation() {
+      return ugi;
+    }
+
+    /**
+     * Acquires a new batch of transactions from Hive.
+     *
+     * @param numTransactions is a hint from client indicating how many 
transactions client needs.
+     * @param recordWriter  Used to write record. The same writer instance can
+     *                      be shared with another TransactionBatch (to the 
same endpoint)
+     *                      only after the first TransactionBatch has been 
closed.
+     *                      Writer will be closed when the TransactionBatch is 
closed.
+     * @return
+     * @throws StreamingIOFailure if failed to create new RecordUpdater for 
batch
+     * @throws TransactionBatchUnAvailable if failed to acquire a new 
Transaction batch
+     * @throws ImpersonationFailed failed to run command as proxyUser
+     * @throws InterruptedException
+     */
+    @Override
+    public TransactionBatch fetchTransactionBatch(final int numTransactions,
+                                                      final RecordWriter 
recordWriter)
+            throws StreamingException, TransactionBatchUnAvailable, 
ImpersonationFailed
+                  , InterruptedException {
+      if (ugi==null) {
+        return fetchTransactionBatchImpl(numTransactions, recordWriter);
+      }
+      try {
+        return ugi.doAs (
+                new PrivilegedExceptionAction<TransactionBatch>() {
+                  @Override
+                  public TransactionBatch run() throws StreamingException, 
InterruptedException {
+                    return fetchTransactionBatchImpl(numTransactions, 
recordWriter);
+                  }
+                }
+        );
+      } catch (IOException e) {
+        throw new ImpersonationFailed("Failed to fetch Txn Batch as user '" + 
ugi.getShortUserName()
+                + "' when acquiring Transaction Batch on endPoint " + endPt, 
e);
+      }
+    }
+
+    private TransactionBatch fetchTransactionBatchImpl(int numTransactions,
+                                                  RecordWriter recordWriter)
+            throws StreamingException, TransactionBatchUnAvailable, 
InterruptedException {
+      return new TransactionBatchImpl(username, ugi, endPt, numTransactions, 
msClient,
+          heartbeaterMSClient, recordWriter, agentInfo);
+    }
+
+
+    private static void createPartitionIfNotExists(HiveEndPoint ep,
+                                                   IMetaStoreClient msClient, 
HiveConf conf)
+            throws InvalidTable, PartitionCreationFailed {
+      if (ep.partitionVals.isEmpty()) {
+        return;
+      }
+      SessionState localSession = null;
+      if(SessionState.get() == null) {
+        localSession = SessionState.start(new CliSessionState(conf));
+      }
+      IDriver driver = DriverFactory.newDriver(conf);
+
+      try {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Attempting to create partition (if not existent) " + ep);
+        }
+
+        List<FieldSchema> partKeys = msClient.getTable(ep.database, ep.table)
+                .getPartitionKeys();
+        runDDL(driver, "use " + ep.database);
+        String query = "alter table " + ep.table + " add if not exists 
partition "
+                + partSpecStr(partKeys, ep.partitionVals);
+        runDDL(driver, query);
+      } catch (MetaException e) {
+        LOG.error("Failed to create partition : " + ep, e);
+        throw new PartitionCreationFailed(ep, e);
+      } catch (NoSuchObjectException e) {
+        LOG.error("Failed to create partition : " + ep, e);
+        throw new InvalidTable(ep.database, ep.table);
+      } catch (TException e) {
+        LOG.error("Failed to create partition : " + ep, e);
+        throw new PartitionCreationFailed(ep, e);
+      } catch (QueryFailedException e) {
+        LOG.error("Failed to create partition : " + ep, e);
+        throw new PartitionCreationFailed(ep, e);
+      } finally {
+        driver.close();
+        try {
+          if(localSession != null) {
+            localSession.close();
+          }
+        } catch (IOException e) {
+          LOG.warn("Error closing SessionState used to run Hive DDL.");
+        }
+      }
+    }
+
+    private static boolean runDDL(IDriver driver, String sql) throws 
QueryFailedException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Running Hive Query: " + sql);
+      }
+      driver.run(sql);
+      return true;
+    }
+
+    private static String partSpecStr(List<FieldSchema> partKeys, 
ArrayList<String> partVals) {
+      if (partKeys.size()!=partVals.size()) {
+        throw new IllegalArgumentException("Partition values:" + partVals +
+                ", does not match the partition Keys in table :" + partKeys );
+      }
+      StringBuilder buff = new StringBuilder(partKeys.size()*20);
+      buff.append(" ( ");
+      int i=0;
+      for (FieldSchema schema : partKeys) {
+        buff.append(schema.getName());
+        buff.append("='");
+        buff.append(partVals.get(i));
+        buff.append("'");
+        if (i!=partKeys.size()-1) {
+          buff.append(",");
+        }
+        ++i;
+      }
+      buff.append(" )");
+      return buff.toString();
+    }
+
+    private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, 
HiveConf conf, boolean secureMode)
+            throws ConnectionError {
+
+      if (endPoint.metaStoreUri!= null) {
+        conf.setVar(HiveConf.ConfVars.METASTOREURIS, endPoint.metaStoreUri);
+      }
+      if(secureMode) {
+        conf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL,true);
+      }
+      try {
+        return HCatUtil.getHiveMetastoreClient(conf);
+      } catch (MetaException e) {
+        throw new ConnectionError("Error connecting to Hive Metastore URI: "
+                + endPoint.metaStoreUri + ". " + e.getMessage(), e);
+      } catch (IOException e) {
+        throw new ConnectionError("Error connecting to Hive Metastore URI: "
+            + endPoint.metaStoreUri + ". " + e.getMessage(), e);
+      }
+    }
+  } // class ConnectionImpl
+
+  private static class TransactionBatchImpl implements TransactionBatch {
+    private final String username;
+    private final UserGroupInformation ugi;
+    private final HiveEndPoint endPt;
+    private final IMetaStoreClient msClient;
+    private final IMetaStoreClient heartbeaterMSClient;
+    private final RecordWriter recordWriter;
+    private final List<TxnToWriteId> txnToWriteIds;
+
+    //volatile because heartbeat() may be in a "different" thread; updates of 
this are "piggybacking"
+    private volatile int currentTxnIndex = -1;
+    private final String partNameForLock;
+    //volatile because heartbeat() may be in a "different" thread
+    private volatile TxnState state;
+    private LockRequest lockRequest = null;
+    /**
+     * once any operation on this batch encounters a system exception
+     * (e.g. IOException on write) it's safest to assume that we can't write 
to the
+     * file backing this batch any more.  This guards important public methods
+     */
+    private volatile boolean isClosed = false;
+    private final String agentInfo;
+    /**
+     * Tracks the state of each transaction
+     */
+    private final TxnState[] txnStatus;
+    /**
+     * ID of the last txn used by {@link #beginNextTransactionImpl()}
+     */
+    private long lastTxnUsed;
+
+    /**
+     * Represents a batch of transactions acquired from MetaStore
+     *
+     * @throws StreamingException if failed to create new RecordUpdater for 
batch
+     * @throws TransactionBatchUnAvailable if failed to acquire a new 
Transaction batch
+     */
+    private TransactionBatchImpl(final String user, UserGroupInformation ugi, 
HiveEndPoint endPt,
+        final int numTxns, final IMetaStoreClient msClient,
+        final IMetaStoreClient heartbeaterMSClient, RecordWriter recordWriter, 
String agentInfo)
+        throws StreamingException, TransactionBatchUnAvailable, 
InterruptedException {
+      boolean success = false;
+      try {
+        if ( endPt.partitionVals!=null   &&   !endPt.partitionVals.isEmpty() ) 
{
+          Table tableObj = msClient.getTable(endPt.database, endPt.table);
+          List<FieldSchema> partKeys = tableObj.getPartitionKeys();
+          partNameForLock = Warehouse.makePartName(partKeys, 
endPt.partitionVals);
+        } else {
+          partNameForLock = null;
+        }
+        this.username = user;
+        this.ugi = ugi;
+        this.endPt = endPt;
+        this.msClient = msClient;
+        this.heartbeaterMSClient = heartbeaterMSClient;
+        this.recordWriter = recordWriter;
+        this.agentInfo = agentInfo;
+
+        List<Long> txnIds = openTxnImpl(msClient, user, numTxns, ugi);
+        txnToWriteIds = allocateWriteIdsImpl(msClient, txnIds, ugi);
+        assert(txnToWriteIds.size() == numTxns);
+
+        txnStatus = new TxnState[numTxns];
+        for(int i = 0; i < txnStatus.length; i++) {
+          assert(txnToWriteIds.get(i).getTxnId() == txnIds.get(i));
+          txnStatus[i] = TxnState.OPEN;//Open matches Metastore state
+        }
+        this.state = TxnState.INACTIVE;
+
+        // The Write Ids returned for the transaction batch is also sequential
+        recordWriter.newBatch(txnToWriteIds.get(0).getWriteId(), 
txnToWriteIds.get(numTxns-1).getWriteId());
+        success = true;
+      } catch (TException e) {
+        throw new TransactionBatchUnAvailable(endPt, e);
+      } catch (IOException e) {
+        throw new TransactionBatchUnAvailable(endPt, e);
+      }
+      finally {
+        //clean up if above throws
+        markDead(success);
+      }
+    }
+
+    private List<Long> openTxnImpl(final IMetaStoreClient msClient, final 
String user, final int numTxns, UserGroupInformation ugi)
+            throws IOException, TException,  InterruptedException {
+      if(ugi==null) {
+        return  msClient.openTxns(user, numTxns).getTxn_ids();
+      }
+      return (List<Long>) ugi.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          return msClient.openTxns(user, numTxns).getTxn_ids();
+        }
+      });
+    }
+
+    private List<TxnToWriteId> allocateWriteIdsImpl(final IMetaStoreClient 
msClient,
+                                                    final List<Long> txnIds, 
UserGroupInformation ugi)
+            throws IOException, TException,  InterruptedException {
+      if(ugi==null) {
+        return  msClient.allocateTableWriteIdsBatch(txnIds, endPt.database, 
endPt.table);
+      }
+      return (List<TxnToWriteId>) ugi.doAs(new 
PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          return msClient.allocateTableWriteIdsBatch(txnIds, endPt.database, 
endPt.table);
+        }
+      });
+    }
+
+    @Override
+    public String toString() {
+      if (txnToWriteIds==null || txnToWriteIds.isEmpty()) {
+        return "{}";
+      }
+      StringBuilder sb = new StringBuilder(" TxnStatus[");
+      for(TxnState state : txnStatus) {
+        //'state' should not be null - future proofing
+        sb.append(state == null ? "N" : state);
+      }
+      sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed));
+      return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getTxnId()
+              + "/" + txnToWriteIds.get(0).getWriteId()
+              + "..."
+              + txnToWriteIds.get(txnToWriteIds.size()-1).getTxnId()
+              + "/" + txnToWriteIds.get(txnToWriteIds.size()-1).getWriteId()
+              + "] on endPoint = " + endPt + "; " + sb;
+    }
+
+    /**
+     * Activate the next available transaction in the current transaction batch
+     * @throws TransactionError failed to switch to next transaction
+     */
+    @Override
+    public void beginNextTransaction() throws TransactionError, 
ImpersonationFailed,
+            InterruptedException {
+      checkIsClosed();
+      if (ugi==null) {
+        beginNextTransactionImpl();
+        return;
+      }
+      try {
+        ugi.doAs (
+              new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws TransactionError {
+                  beginNextTransactionImpl();
+                  return null;
+                }
+              }
+        );
+      } catch (IOException e) {
+        throw new ImpersonationFailed("Failed switching to next Txn as user '" 
+ username +
+                "' in Txn batch :" + this, e);
+      }
+    }
+
+    private void beginNextTransactionImpl() throws TransactionError {
+      state = TxnState.INACTIVE;//clear state from previous txn
+
+      if ((currentTxnIndex + 1) >= txnToWriteIds.size()) {
+        throw new InvalidTrasactionState("No more transactions available in" +
+                " current batch for end point : " + endPt);
+      }
+      ++currentTxnIndex;
+      state = TxnState.OPEN;
+      lastTxnUsed = getCurrentTxnId();
+      lockRequest = createLockRequest(endPt, partNameForLock, username, 
getCurrentTxnId(), agentInfo);
+      try {
+        LockResponse res = msClient.lock(lockRequest);
+        if (res.getState() != LockState.ACQUIRED) {
+          throw new TransactionError("Unable to acquire lock on " + endPt);
+        }
+      } catch (TException e) {
+        throw new TransactionError("Unable to acquire lock on " + endPt, e);
+      }
+    }
+
+    /**
+     * Get Id of currently open transaction.
+     * @return -1 if there is no open TX
+     */
+    @Override
+    public Long getCurrentTxnId() {
+      if (currentTxnIndex >= 0) {
+        return txnToWriteIds.get(currentTxnIndex).getTxnId();
+      }
+      return -1L;
+    }
+
+    /**
+     * Get Id of currently open transaction.
+     * @return -1 if there is no open TX
+     */
+    @Override
+    public Long getCurrentWriteId() {
+      if (currentTxnIndex >= 0) {
+        return txnToWriteIds.get(currentTxnIndex).getWriteId();
+      }
+      return -1L;
+    }
+
+    /**
+     * get state of current transaction
+     * @return
+     */
+    @Override
+    public TxnState getCurrentTransactionState() {
+      return state;
+    }
+
+    /**
+     * Remaining transactions are the ones that are not committed or aborted 
or active.
+     * Active transaction is not considered part of remaining txns.
+     * @return number of transactions remaining this batch.
+     */
+    @Override
+    public int remainingTransactions() {
+      if (currentTxnIndex>=0) {
+        return txnToWriteIds.size() - currentTxnIndex -1;
+      }
+      return txnToWriteIds.size();
+    }
+
+
+    /**
+     *  Write record using RecordWriter
+     * @param record  the data to be written
+     * @throws StreamingIOFailure I/O failure
+     * @throws SerializationError  serialization error
+     * @throws ImpersonationFailed error writing on behalf of proxyUser
+     * @throws InterruptedException
+     */
+    @Override
+    public void write(final byte[] record)
+            throws StreamingException, InterruptedException {
+      write(Collections.singletonList(record));
+    }
+    private void checkIsClosed() throws IllegalStateException {
+      if(isClosed) {
+        throw new IllegalStateException("TransactionBatch " + toString() + " 
has been closed()");
+      }
+    }
+    /**
+     * A transaction batch opens a single HDFS file and writes multiple 
transaction to it.  If there is any issue
+     * with the write, we can't continue to write to the same file any as it 
may be corrupted now (at the tail).
+     * This ensures that a client can't ignore these failures and continue to 
write.
+     */
+    private void markDead(boolean success) {
+      if(success) {
+        return;
+      }
+      isClosed = true;//also ensures that heartbeat() is no-op since client is 
likely doing it async
+      try {
+        abort(true);//abort all remaining txns
+      }
+      catch(Exception ex) {
+        LOG.error("Fatal error on " + toString() + "; cause " + 
ex.getMessage(), ex);
+      }
+      try {
+        closeImpl();
+      }
+      catch (Exception ex) {
+        LOG.error("Fatal error on " + toString() + "; cause " + 
ex.getMessage(), ex);
+      }
+    }
+
+
+    /**
+     *  Write records using RecordWriter
+     * @param records collection of rows to be written
+     * @throws StreamingException  serialization error
+     * @throws ImpersonationFailed error writing on behalf of proxyUser
+     * @throws InterruptedException
+     */
+    @Override
+    public void write(final Collection<byte[]> records)
+            throws StreamingException, InterruptedException,
+            ImpersonationFailed {
+      checkIsClosed();
+      boolean success = false;
+      try {
+        if (ugi == null) {
+          writeImpl(records);
+        } else {
+          ugi.doAs(
+            new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws StreamingException {
+                writeImpl(records);
+                return null;
+              }
+            }
+          );
+        }
+        success = true;
+      } catch(SerializationError ex) {
+        //this exception indicates that a {@code record} could not be parsed 
and the
+        //caller can decide whether to drop it or send it to dead letter queue.
+        //rolling back the txn and retrying won't help since the tuple will be 
exactly the same
+        //when it's replayed.
+        success = true;
+        throw ex;
+      } catch(IOException e){
+        throw new ImpersonationFailed("Failed writing as user '" + username +
+          "' to endPoint :" + endPt + ". Transaction Id: "
+          + getCurrentTxnId(), e);
+      }
+      finally {
+        markDead(success);
+      }
+    }
+
+    private void writeImpl(Collection<byte[]> records)
+            throws StreamingException {
+      for (byte[] record : records) {
+        recordWriter.write(getCurrentWriteId(), record);
+      }
+    }
+
+
+    /**
+     * Commit the currently open transaction
+     * @throws TransactionError
+     * @throws StreamingIOFailure  if flushing records failed
+     * @throws ImpersonationFailed if
+     * @throws InterruptedException
+     */
+    @Override
+    public void commit()  throws TransactionError, StreamingException,
+           ImpersonationFailed, InterruptedException {
+      checkIsClosed();
+      boolean success = false;
+      try {
+        if (ugi == null) {
+          commitImpl();
+        }
+        else {
+          ugi.doAs(
+            new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws StreamingException {
+                commitImpl();
+                return null;
+              }
+            }
+          );
+        }
+        success = true;
+      } catch (IOException e) {
+        throw new ImpersonationFailed("Failed committing Txn ID " + 
getCurrentTxnId() + " as user '"
+                + username + "'on endPoint :" + endPt + ". Transaction Id: ", 
e);
+      }
+      finally {
+        markDead(success);
+      }
+    }
+
+    private void commitImpl() throws TransactionError, StreamingException {
+      try {
+        recordWriter.flush();
+        msClient.commitTxn(txnToWriteIds.get(currentTxnIndex).getTxnId());
+        state = TxnState.COMMITTED;
+        txnStatus[currentTxnIndex] = TxnState.COMMITTED;
+      } catch (NoSuchTxnException e) {
+        throw new TransactionError("Invalid transaction id : "
+                + getCurrentTxnId(), e);
+      } catch (TxnAbortedException e) {
+        throw new TransactionError("Aborted transaction cannot be committed"
+                , e);
+      } catch (TException e) {
+        throw new TransactionError("Unable to commit transaction"
+                + getCurrentTxnId(), e);
+      }
+    }
+
+    /**
+     * Abort the currently open transaction
+     * @throws TransactionError
+     */
+    @Override
+    public void abort() throws TransactionError, StreamingException
+                      , ImpersonationFailed, InterruptedException {
+      if(isClosed) {
+        /**
+         * isDead is only set internally by this class.  {@link 
#markDead(boolean)} will abort all
+         * remaining txns, so make this no-op to make sure that a well-behaved 
client that calls abort()
+         * error doesn't get misleading errors
+         */
+        return;
+      }
+      abort(false);
+    }
+    private void abort(final boolean abortAllRemaining) throws 
TransactionError, StreamingException
+        , ImpersonationFailed, InterruptedException {
+      if (ugi==null) {
+        abortImpl(abortAllRemaining);
+        return;
+      }
+      try {
+        ugi.doAs (
+                new PrivilegedExceptionAction<Void>() {
+                  @Override
+                  public Void run() throws StreamingException {
+                    abortImpl(abortAllRemaining);
+                    return null;
+                  }
+                }
+        );
+      } catch (IOException e) {
+        throw new ImpersonationFailed("Failed aborting Txn " + 
getCurrentTxnId()  + " as user '"
+                + username + "' on endPoint :" + endPt, e);
+      }
+    }
+
+    private void abortImpl(boolean abortAllRemaining) throws TransactionError, 
StreamingException {
+      try {
+        if(abortAllRemaining) {
+          //when last txn finished (abort/commit) the currentTxnIndex is 
pointing at that txn
+          //so we need to start from next one, if any.  Also if batch was 
created but
+          //fetchTransactionBatch() was never called, we want to start with 
first txn
+          int minOpenTxnIndex = Math.max(currentTxnIndex +
+            (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 
0), 0);
+          for(currentTxnIndex = minOpenTxnIndex;
+              currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) {
+            
msClient.rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId());
+            txnStatus[currentTxnIndex] = TxnState.ABORTED;
+          }
+          currentTxnIndex--;//since the loop left it == txnToWriteIds.size()
+        }
+        else {
+          if (getCurrentTxnId() > 0) {
+            msClient.rollbackTxn(getCurrentTxnId());
+            txnStatus[currentTxnIndex] = TxnState.ABORTED;
+          }
+        }
+        state = TxnState.ABORTED;
+        recordWriter.clear();
+      } catch (NoSuchTxnException e) {
+        throw new TransactionError("Unable to abort invalid transaction id : "
+                + getCurrentTxnId(), e);
+      } catch (TException e) {
+        throw new TransactionError("Unable to abort transaction id : "
+                + getCurrentTxnId(), e);
+      }
+    }
+
+    @Override
+    public void heartbeat() throws StreamingException, HeartBeatFailure {
+      if(isClosed) {
+        return;
+      }
+      if(state != TxnState.OPEN && currentTxnIndex >= txnToWriteIds.size() - 
1) {
+        //here means last txn in the batch is resolved but the close() hasn't 
been called yet so
+        //there is nothing to heartbeat
+        return;
+      }
+      //if here after commit()/abort() but before next beginNextTransaction(), 
currentTxnIndex still
+      //points at the last txn which we don't want to heartbeat
+      Long first = txnToWriteIds.get(state == TxnState.OPEN ? currentTxnIndex 
: currentTxnIndex + 1).getTxnId();
+      Long last = txnToWriteIds.get(txnToWriteIds.size()-1).getTxnId();
+      try {
+        HeartbeatTxnRangeResponse resp = 
heartbeaterMSClient.heartbeatTxnRange(first, last);
+        if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) {
+          throw new HeartBeatFailure(resp.getAborted(), resp.getNosuch());
+        }
+      } catch (TException e) {
+        throw new StreamingException("Failure to heartbeat on ids (" + first + 
"src/gen/thrift"
+                + last + ") on end point : " + endPt );
+      }
+    }
+
+    @Override
+    public boolean isClosed() {
+      return isClosed;
+    }
+    /**
+     * Close the TransactionBatch.  This will abort any still open txns in 
this batch.
+     * @throws StreamingIOFailure I/O failure when closing transaction batch
+     */
+    @Override
+    public void close() throws StreamingException, ImpersonationFailed, 
InterruptedException {
+      if(isClosed) {
+        return;
+      }
+      isClosed = true;
+      abortImpl(true);//abort proactively so that we don't wait for timeout
+      closeImpl();//perhaps we should add a version of 
RecordWriter.closeBatch(boolean abort) which
+      //will call RecordUpdater.close(boolean abort)
+    }
+    private void closeImpl() throws StreamingException, InterruptedException{
+      state = TxnState.INACTIVE;
+      if(ugi == null) {
+        recordWriter.closeBatch();
+        return;
+      }
+      try {
+        ugi.doAs (
+                new PrivilegedExceptionAction<Void>() {
+                  @Override
+                  public Void run() throws StreamingException {
+                    recordWriter.closeBatch();
+                    return null;
+                  }
+                }
+        );
+        try {
+          FileSystem.closeAllForUGI(ugi);
+        } catch (IOException exception) {
+          LOG.error("Could not clean up file-system handles for UGI: " + ugi, 
exception);
+        }
+      } catch (IOException e) {
+        throw new ImpersonationFailed("Failed closing Txn Batch as user '" + 
username +
+                "' on  endPoint :" + endPt, e);
+      }
+    }
+
+    private static LockRequest createLockRequest(final HiveEndPoint 
hiveEndPoint,
+            String partNameForLock, String user, long txnId, String agentInfo) 
 {
+      LockRequestBuilder rqstBuilder = agentInfo == null ?
+        new LockRequestBuilder() : new LockRequestBuilder(agentInfo);
+      rqstBuilder.setUser(user);
+      rqstBuilder.setTransactionId(txnId);
+
+      LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
+              .setDbName(hiveEndPoint.database)
+              .setTableName(hiveEndPoint.table)
+              .setShared()
+              .setOperationType(DataOperationType.INSERT);
+      if (partNameForLock!=null && !partNameForLock.isEmpty() ) {
+          lockCompBuilder.setPartitionName(partNameForLock);
+      }
+      rqstBuilder.addLockComponent(lockCompBuilder.build());
+
+      return rqstBuilder.build();
+    }
+  } // class TransactionBatchImpl
+
+  static HiveConf createHiveConf(Class<?> clazz, String metaStoreUri) {
+    HiveConf conf = new HiveConf(clazz);
+    if (metaStoreUri!= null) {
+      setHiveConf(conf, HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
+    }
+    HiveEndPoint.overrideConfSettings(conf);
+    return conf;
+  }
+
+  private static void overrideConfSettings(HiveConf conf) {
+    setHiveConf(conf, HiveConf.ConfVars.HIVE_TXN_MANAGER,
+            "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+    setHiveConf(conf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+    setHiveConf(conf, HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
+    // Avoids creating Tez Client sessions internally as it takes much longer 
currently
+    setHiveConf(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr");
+  }
+
+  private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, String 
value) {
+    if( LOG.isDebugEnabled() ) {
+      LOG.debug("Overriding HiveConf setting : " + var + " = " + value);
+    }
+    conf.setVar(var, value);
+  }
+
+  private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, 
boolean value) {
+    if( LOG.isDebugEnabled() ) {
+      LOG.debug("Overriding HiveConf setting : " + var + " = " + value);
+    }
+    conf.setBoolVar(var, value);
+  }
+
+}  // class HiveEndPoint

http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java 
b/streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java
new file mode 100644
index 0000000..23e17e7
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class ImpersonationFailed extends StreamingException {
+  public ImpersonationFailed(String username, Exception e) {
+    super("Failed to impersonate user " + username, e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/InvalidColumn.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidColumn.java 
b/streaming/src/java/org/apache/hive/streaming/InvalidColumn.java
new file mode 100644
index 0000000..0011b14
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/InvalidColumn.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class InvalidColumn extends StreamingException {
+
+  public InvalidColumn(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/InvalidPartition.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidPartition.java 
b/streaming/src/java/org/apache/hive/streaming/InvalidPartition.java
new file mode 100644
index 0000000..f1f9804
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/InvalidPartition.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class InvalidPartition extends StreamingException {
+
+  public InvalidPartition(String partitionName, String partitionValue) {
+    super("Invalid partition: Name=" + partitionName +
+            ", Value=" + partitionValue);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/InvalidTable.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidTable.java 
b/streaming/src/java/org/apache/hive/streaming/InvalidTable.java
new file mode 100644
index 0000000..ef1c91d
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/InvalidTable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class InvalidTable extends StreamingException {
+
+  private static String makeMsg(String db, String table) {
+    return "Invalid table db:" + db + ", table:" + table;
+  }
+
+  public InvalidTable(String db, String table) {
+    super(makeMsg(db,table), null);
+  }
+
+  public InvalidTable(String db, String table, String msg) {
+    super(makeMsg(db, table) + ": " + msg, null);
+  }
+
+  public InvalidTable(String db, String table, Exception inner) {
+    super(makeMsg(db, table) + ": " + inner.getMessage(), inner);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java 
b/streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java
new file mode 100644
index 0000000..762f5f8
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class InvalidTrasactionState extends TransactionError {
+  public InvalidTrasactionState(String msg) {
+    super(msg);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java 
b/streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java
new file mode 100644
index 0000000..5f9aca6
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class PartitionCreationFailed extends StreamingException {
+  public PartitionCreationFailed(HiveEndPoint endPoint, Exception cause) {
+    super("Failed to create partition " + endPoint, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/QueryFailedException.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/QueryFailedException.java 
b/streaming/src/java/org/apache/hive/streaming/QueryFailedException.java
new file mode 100644
index 0000000..ccd3ae0
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/QueryFailedException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class QueryFailedException extends StreamingException {
+  String query;
+
+  public QueryFailedException(String query, Exception e) {
+    super("Query failed: " + query + ". Due to :" + e.getMessage(), e);
+    this.query = query;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java 
b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
new file mode 100644
index 0000000..dc6d70e
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+public interface RecordWriter {
+
+  /** Writes using a hive RecordUpdater
+   *
+   * @param writeId the write ID of the table mapping to Txn in which the 
write occurs
+   * @param record the record to be written
+   */
+  void write(long writeId, byte[] record) throws StreamingException;
+
+  /** Flush records from buffer. Invoked by TransactionBatch.commit() */
+  void flush() throws StreamingException;
+
+  /** Clear bufferred writes. Invoked by TransactionBatch.abort() */
+  void clear() throws StreamingException;
+
+  /** Acquire a new RecordUpdater. Invoked when
+   * StreamingConnection.fetchTransactionBatch() is called */
+  void newBatch(Long minWriteId, Long maxWriteID) throws StreamingException;
+
+  /** Close the RecordUpdater. Invoked by TransactionBatch.close() */
+  void closeBatch() throws StreamingException;
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/SerializationError.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/SerializationError.java 
b/streaming/src/java/org/apache/hive/streaming/SerializationError.java
new file mode 100644
index 0000000..a57ba00
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/SerializationError.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+public class SerializationError extends StreamingException {
+  public SerializationError(String msg, Exception e) {
+    super(msg,e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java 
b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
new file mode 100644
index 0000000..2f760ea
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Represents a connection to a HiveEndPoint. Used to acquire transaction 
batches.
+ * Note: the expectation is that there is at most 1 TransactionBatch 
outstanding for any given
+ * StreamingConnection.  Violating this may result in "out of sequence 
response".
+ */
+public interface StreamingConnection {
+
+  /**
+   * Acquires a new batch of transactions from Hive.
+
+   * @param numTransactionsHint is a hint from client indicating how many 
transactions client needs.
+   * @param writer  Used to write record. The same writer instance can
+   *                      be shared with another TransactionBatch (to the same 
endpoint)
+   *                      only after the first TransactionBatch has been 
closed.
+   *                      Writer will be closed when the TransactionBatch is 
closed.
+   * @return
+   * @throws ConnectionError
+   * @throws InvalidPartition
+   * @throws StreamingException
+   * @return a batch of transactions
+   */
+  public TransactionBatch fetchTransactionBatch(int numTransactionsHint,
+                                                RecordWriter writer)
+          throws ConnectionError, StreamingException, InterruptedException;
+
+  /**
+   * Close connection
+   */
+  public void close();
+
+  /**
+   * @return UserGroupInformation associated with this connection or {@code 
null} if there is none
+   */
+  UserGroupInformation getUserGroupInformation();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/StreamingException.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/StreamingException.java 
b/streaming/src/java/org/apache/hive/streaming/StreamingException.java
new file mode 100644
index 0000000..a7f84c1
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/StreamingException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class StreamingException extends Exception {
+  public StreamingException(String msg, Exception cause) {
+    super(msg, cause);
+  }
+  public StreamingException(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java 
b/streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java
new file mode 100644
index 0000000..0dfbfa7
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+public class StreamingIOFailure extends StreamingException {
+
+  public StreamingIOFailure(String msg, Exception cause) {
+    super(msg, cause);
+  }
+
+  public StreamingIOFailure(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java 
b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
new file mode 100644
index 0000000..0077913
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hive.hcatalog.data.HCatRecordObjectInspector;
+import org.apache.hive.hcatalog.data.JsonSerDe;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Streaming Writer handles utf8 encoded Json (Strict syntax).
+ * Uses org.apache.hive.hcatalog.data.JsonSerDe to process Json input
+ */
+public class StrictJsonWriter extends AbstractRecordWriter {
+  private JsonSerDe serde;
+
+  private final HCatRecordObjectInspector recordObjInspector;
+  private final ObjectInspector[] bucketObjInspectors;
+  private final StructField[] bucketStructFields;
+
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link 
#StrictJsonWriter(HiveEndPoint, HiveConf, StreamingConnection)}
+   */
+  public StrictJsonWriter(HiveEndPoint endPoint)
+    throws ConnectionError, SerializationError, StreamingException {
+    this(endPoint, null, null);
+  }
+
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link 
#StrictJsonWriter(HiveEndPoint, HiveConf, StreamingConnection)}
+   */
+  public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf) throws 
StreamingException {
+    this(endPoint, conf, null);
+  }
+  /**
+   * @param endPoint the end point to write to
+   * @throws ConnectionError
+   * @throws SerializationError
+   * @throws StreamingException
+   */
+  public StrictJsonWriter(HiveEndPoint endPoint, StreamingConnection conn)
+          throws ConnectionError, SerializationError, StreamingException {
+    this(endPoint, null, conn);
+  }
+  /**
+   * @param endPoint the end point to write to
+   * @param conf a Hive conf object. Should be null if not using advanced Hive 
settings.
+   * @param conn connection this Writer is to be used with
+   * @throws ConnectionError
+   * @throws SerializationError
+   * @throws StreamingException
+   */
+  public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf, 
StreamingConnection conn)
+          throws ConnectionError, SerializationError, StreamingException {
+    super(endPoint, conf, conn);
+    this.serde = createSerde(tbl, conf);
+    // get ObjInspectors for entire record and bucketed cols
+    try {
+      recordObjInspector = ( HCatRecordObjectInspector ) 
serde.getObjectInspector();
+      this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, 
recordObjInspector);
+    } catch (SerDeException e) {
+      throw new SerializationError("Unable to get ObjectInspector for bucket 
columns", e);
+    }
+
+    // get StructFields for bucketed cols
+    bucketStructFields = new StructField[bucketIds.size()];
+    List<? extends StructField> allFields = 
recordObjInspector.getAllStructFieldRefs();
+    for (int i = 0; i < bucketIds.size(); i++) {
+      bucketStructFields[i] = allFields.get(bucketIds.get(i));
+    }
+  }
+
+  @Override
+  public AbstractSerDe getSerde() {
+    return serde;
+  }
+
+  protected HCatRecordObjectInspector getRecordObjectInspector() {
+    return recordObjInspector;
+  }
+
+  @Override
+  protected StructField[] getBucketStructFields() {
+    return bucketStructFields;
+  }
+
+  protected ObjectInspector[] getBucketObjectInspectors() {
+    return bucketObjInspectors;
+  }
+
+
+  @Override
+  public void write(long writeId, byte[] record)
+          throws StreamingIOFailure, SerializationError {
+    try {
+      Object encodedRow = encode(record);
+      int bucket = getBucket(encodedRow);
+      getRecordUpdater(bucket).insert(writeId, encodedRow);
+    } catch (IOException e) {
+      throw new StreamingIOFailure("Error writing record in transaction write 
id("
+              + writeId + ")", e);
+    }
+
+  }
+
+  /**
+   * Creates JsonSerDe
+   * @param tbl   used to create serde
+   * @param conf  used to create serde
+   * @return
+   * @throws SerializationError if serde could not be initialized
+   */
+  private static JsonSerDe createSerde(Table tbl, HiveConf conf)
+          throws SerializationError {
+    try {
+      Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+      JsonSerDe serde = new JsonSerDe();
+      SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
+      return serde;
+    } catch (SerDeException e) {
+      throw new SerializationError("Error initializing serde " + 
JsonSerDe.class.getName(), e);
+    }
+  }
+
+  @Override
+  public Object encode(byte[] utf8StrRecord) throws SerializationError {
+    try {
+      Text blob = new Text(utf8StrRecord);
+      return serde.deserialize(blob);
+    } catch (SerDeException e) {
+      throw new SerializationError("Unable to convert byte[] record into 
Object", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java 
b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
new file mode 100644
index 0000000..c0b7324
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.RegexSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Streaming Writer handles text input data with regex. Uses
+ * org.apache.hadoop.hive.serde2.RegexSerDe
+ */
+public class StrictRegexWriter extends AbstractRecordWriter {
+  private RegexSerDe serde;
+  private final StructObjectInspector recordObjInspector;
+  private final ObjectInspector[] bucketObjInspectors;
+  private final StructField[] bucketStructFields;
+
+  /**
+   * @param endPoint the end point to write to
+   * @param conn     connection this Writer is to be used with
+   * @throws ConnectionError
+   * @throws SerializationError
+   * @throws StreamingException
+   */
+  public StrictRegexWriter(HiveEndPoint endPoint, StreamingConnection conn)
+    throws ConnectionError, SerializationError, StreamingException {
+    this(null, endPoint, null, conn);
+  }
+
+  /**
+   * @param endPoint the end point to write to
+   * @param conf     a Hive conf object. Should be null if not using advanced 
Hive settings.
+   * @param conn     connection this Writer is to be used with
+   * @throws ConnectionError
+   * @throws SerializationError
+   * @throws StreamingException
+   */
+  public StrictRegexWriter(HiveEndPoint endPoint, HiveConf conf, 
StreamingConnection conn)
+    throws ConnectionError, SerializationError, StreamingException {
+    this(null, endPoint, conf, conn);
+  }
+
+  /**
+   * @param regex    to parse the data
+   * @param endPoint the end point to write to
+   * @param conf     a Hive conf object. Should be null if not using advanced 
Hive settings.
+   * @param conn     connection this Writer is to be used with
+   * @throws ConnectionError
+   * @throws SerializationError
+   * @throws StreamingException
+   */
+  public StrictRegexWriter(String regex, HiveEndPoint endPoint, HiveConf conf, 
StreamingConnection conn)
+    throws ConnectionError, SerializationError, StreamingException {
+    super(endPoint, conf, conn);
+    this.serde = createSerde(tbl, conf, regex);
+    // get ObjInspectors for entire record and bucketed cols
+    try {
+      recordObjInspector = (StructObjectInspector) serde.getObjectInspector();
+      this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, 
recordObjInspector);
+    } catch (SerDeException e) {
+      throw new SerializationError("Unable to get ObjectInspector for bucket 
columns", e);
+    }
+
+    // get StructFields for bucketed cols
+    bucketStructFields = new StructField[bucketIds.size()];
+    List<? extends StructField> allFields = 
recordObjInspector.getAllStructFieldRefs();
+    for (int i = 0; i < bucketIds.size(); i++) {
+      bucketStructFields[i] = allFields.get(bucketIds.get(i));
+    }
+  }
+
+  @Override
+  public AbstractSerDe getSerde() {
+    return serde;
+  }
+
+  @Override
+  protected StructObjectInspector getRecordObjectInspector() {
+    return recordObjInspector;
+  }
+
+  @Override
+  protected StructField[] getBucketStructFields() {
+    return bucketStructFields;
+  }
+
+  @Override
+  protected ObjectInspector[] getBucketObjectInspectors() {
+    return bucketObjInspectors;
+  }
+
+
+  @Override
+  public void write(long writeId, byte[] record)
+    throws StreamingIOFailure, SerializationError {
+    try {
+      Object encodedRow = encode(record);
+      int bucket = getBucket(encodedRow);
+      getRecordUpdater(bucket).insert(writeId, encodedRow);
+    } catch (IOException e) {
+      throw new StreamingIOFailure("Error writing record in transaction write 
id("
+        + writeId + ")", e);
+    }
+  }
+
+  /**
+   * Creates RegexSerDe
+   *
+   * @param tbl   used to create serde
+   * @param conf  used to create serde
+   * @param regex used to create serde
+   * @return
+   * @throws SerializationError if serde could not be initialized
+   */
+  private static RegexSerDe createSerde(Table tbl, HiveConf conf, String regex)
+    throws SerializationError {
+    try {
+      Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+      tableProps.setProperty(RegexSerDe.INPUT_REGEX, regex);
+      ArrayList<String> tableColumns = getCols(tbl);
+      tableProps.setProperty(serdeConstants.LIST_COLUMNS, 
StringUtils.join(tableColumns, ","));
+      RegexSerDe serde = new RegexSerDe();
+      SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
+      return serde;
+    } catch (SerDeException e) {
+      throw new SerializationError("Error initializing serde " + 
RegexSerDe.class.getName(), e);
+    }
+  }
+
+  private static ArrayList<String> getCols(Table table) {
+    List<FieldSchema> cols = table.getSd().getCols();
+    ArrayList<String> colNames = new ArrayList<String>(cols.size());
+    for (FieldSchema col : cols) {
+      colNames.add(col.getName().toLowerCase());
+    }
+    return colNames;
+  }
+
+  /**
+   * Encode Utf8 encoded string bytes using RegexSerDe
+   *
+   * @param utf8StrRecord
+   * @return The encoded object
+   * @throws SerializationError
+   */
+  @Override
+  public Object encode(byte[] utf8StrRecord) throws SerializationError {
+    try {
+      Text blob = new Text(utf8StrRecord);
+      return serde.deserialize(blob);
+    } catch (SerDeException e) {
+      throw new SerializationError("Unable to convert byte[] record into 
Object", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java 
b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
new file mode 100644
index 0000000..2b05771
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+import java.util.Collection;
+
+/**
+ * Represents a set of Transactions returned by Hive. Supports opening, 
writing to
+ * and commiting/aborting each transaction. The interface is designed to ensure
+ * transactions in a batch are used up sequentially. To stream to the same 
HiveEndPoint
+ * concurrently, create separate StreamingConnections.
+ *
+ * Note on thread safety: At most 2 threads can run through a given 
TransactionBatch at the same
+ * time.  One thread may call {@link #heartbeat()} and the other all other 
methods.
+ * Violating this may result in "out of sequence response".
+ *
+ */
+public interface TransactionBatch  {
+  enum TxnState {
+    INACTIVE("I"), OPEN("O"), COMMITTED("C"), ABORTED("A");
+
+    private final String code;
+    TxnState(String code) {
+      this.code = code;
+    };
+    public String toString() {
+      return code;
+    }
+  }
+
+  /**
+   * Activate the next available transaction in the current transaction batch.
+   * @throws StreamingException if not able to switch to next Txn
+   * @throws InterruptedException if call in interrupted
+   */
+  void beginNextTransaction() throws StreamingException, InterruptedException;
+
+  /**
+   * Get Id of currently open transaction.
+   * @return transaction id
+   */
+  Long getCurrentTxnId();
+
+
+  /**
+   * Get write Id mapping to currently open transaction.
+   * @return write id
+   */
+  Long getCurrentWriteId();
+
+  /**
+   * get state of current transaction.
+   */
+  TxnState getCurrentTransactionState();
+
+  /**
+   * Commit the currently open transaction.
+   * @throws StreamingException if there are errors committing
+   * @throws InterruptedException if call in interrupted
+   */
+  void commit() throws StreamingException, InterruptedException;
+
+  /**
+   * Abort the currently open transaction.
+   * @throws StreamingException if there are errors
+   * @throws InterruptedException if call in interrupted
+   */
+  void abort() throws StreamingException, InterruptedException;
+
+  /**
+   * Remaining transactions are the ones that are not committed or aborted or 
open.
+   * Current open transaction is not considered part of remaining txns.
+   * @return number of transactions remaining this batch.
+   */
+  int remainingTransactions();
+
+
+  /**
+   *  Write record using RecordWriter.
+   * @param record  the data to be written
+   * @throws StreamingException if there are errors when writing
+   * @throws InterruptedException if call in interrupted
+   */
+  void write(byte[] record) throws StreamingException, InterruptedException;
+
+  /**
+   *  Write records using RecordWriter.
+   * @throws StreamingException if there are errors when writing
+   * @throws InterruptedException if call in interrupted
+   */
+  void write(Collection<byte[]> records) throws StreamingException, 
InterruptedException;
+
+
+  /**
+   * Issues a heartbeat to hive metastore on the current and remaining txn ids
+   * to keep them from expiring.
+   * @throws StreamingException if there are errors
+   */
+  void heartbeat() throws StreamingException;
+
+  /**
+   * Close the TransactionBatch.
+   * @throws StreamingException if there are errors closing batch
+   * @throws InterruptedException if call in interrupted
+   */
+  void close() throws StreamingException, InterruptedException;
+  boolean isClosed();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java 
b/streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java
new file mode 100644
index 0000000..a8c8cd4
--- /dev/null
+++ 
b/streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class TransactionBatchUnAvailable extends StreamingException {
+  public TransactionBatchUnAvailable(HiveEndPoint ep, Exception e) {
+    super("Unable to acquire transaction batch on end point: " + ep, e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6bd32a0d/streaming/src/java/org/apache/hive/streaming/TransactionError.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionError.java 
b/streaming/src/java/org/apache/hive/streaming/TransactionError.java
new file mode 100644
index 0000000..a331b20
--- /dev/null
+++ b/streaming/src/java/org/apache/hive/streaming/TransactionError.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class TransactionError extends StreamingException {
+  public TransactionError(String msg, Exception e) {
+    super(msg + (e == null ? "" : ": " + e.getMessage()), e);
+  }
+
+  public TransactionError(String msg) {
+    super(msg);
+  }
+}

Reply via email to