Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master ef12eb0cf -> 422f5d946


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java
 
b/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java
new file mode 100755
index 0000000..6360768
--- /dev/null
+++ 
b/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.hive;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+
+import javax.validation.constraints.Min;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator.CheckpointListener;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * An implementation of FS Writer that writes text files to hdfs which are
+ * inserted into hive on committed window callback. HiveStreamCodec is used to
+ * make sure that data being sent to a particular hive partition goes to a
+ * specific operator partition by passing FSRollingOutputOperator to the stream
+ * codec. Also filename is determined uniquely for each tuple going to a
+ * specific hive partition.
+ *
+ * @since 2.1.0
+ */
+public abstract class AbstractFSRollingOutputOperator<T> extends 
AbstractFileOutputOperator<T>
+    implements CheckpointListener
+{
+  private transient String outputFilePath;
+  protected MutableInt partNumber;
+  protected HashMap<Long, ArrayList<String>> mapFilenames = new HashMap<Long, 
ArrayList<String>>();
+  protected HashMap<String, ArrayList<String>> mapPartition = new 
HashMap<String, ArrayList<String>>();
+  protected Queue<Long> queueWindows = new LinkedList<Long>();
+  protected long windowIDOfCompletedPart = Stateless.WINDOW_ID;
+  protected long committedWindowId = Stateless.WINDOW_ID;
+  private boolean isEmptyWindow;
+  private transient int operatorId;
+  private int countEmptyWindow;
+  private ArrayList<String> partition = new ArrayList<String>();
+
+  //This variable is user configurable.
+  @Min(0)
+  private long maxWindowsWithNoData = 100;
+
+  /**
+   * The output port that will emit a POJO containing file which is committed
+   * and specific hive partitions in which this file should be loaded to
+   * HiveOperator.
+   */
+  public final transient DefaultOutputPort<FilePartitionMapping> outputPort = 
new DefaultOutputPort<FilePartitionMapping>();
+
+  public AbstractFSRollingOutputOperator()
+  {
+    countEmptyWindow = 0;
+    HiveStreamCodec<T> hiveCodec = new HiveStreamCodec<T>();
+    hiveCodec.rollingOperator = this;
+    streamCodec = hiveCodec;
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    String appId = context.getValue(DAG.APPLICATION_ID);
+    operatorId = context.getId();
+    outputFilePath = File.separator + appId + File.separator + operatorId;
+    super.setup(context);
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    isEmptyWindow = true;
+    windowIDOfCompletedPart = windowId;
+  }
+
+  /*
+   * MapFilenames has mapping from completed file to windowId in which it got 
completed.
+   * Also maintaining a queue of these windowIds which helps in reducing 
iteration time in committed callback.
+   */
+  @Override
+  protected void rotateHook(String finishedFile)
+  {
+    isEmptyWindow = false;
+    if (mapFilenames.containsKey(windowIDOfCompletedPart)) {
+      mapFilenames.get(windowIDOfCompletedPart).add(finishedFile);
+    } else {
+      ArrayList<String> listFileNames = new ArrayList<String>();
+      listFileNames.add(finishedFile);
+      mapFilenames.put(windowIDOfCompletedPart, listFileNames);
+    }
+    queueWindows.add(windowIDOfCompletedPart);
+
+  }
+
+  /*
+   * Filenames include operator Id and the specific hive partitions to which 
the file will be loaded.
+   * Partition is determined based on tuple and its implementation is left to 
user.
+   */
+  @Override
+  protected String getFileName(T tuple)
+  {
+    isEmptyWindow = false;
+    partition = getHivePartition(tuple);
+    StringBuilder output = new StringBuilder(outputFilePath);
+    int numPartitions = partition.size();
+    if (numPartitions > 0) {
+      for (int i = 0; i < numPartitions; i++) {
+        output.append(File.separator).append(partition.get(i));
+      }
+      
output.append(File.separator).append(operatorId).append("-transaction.out.part");
+      String partFile = getPartFileNamePri(output.toString());
+      mapPartition.put(partFile, partition);
+    }
+    return output.toString();
+  }
+
+  /*
+   * Moving completed files into hive on committed window callback.
+   * Criteria for moving them is that the windowId in which they are completed
+   * should be less than committed window.
+   */
+  @Override
+  public void committed(long windowId)
+  {
+    committedWindowId = windowId;
+    Iterator<Long> iterWindows = queueWindows.iterator();
+    ArrayList<String> list = new ArrayList<String>();
+    while (iterWindows.hasNext()) {
+      windowId = iterWindows.next();
+      if (committedWindowId >= windowId) {
+        logger.debug("list is {}", mapFilenames.get(windowId));
+        list = mapFilenames.get(windowId);
+        FilePartitionMapping partMap = new FilePartitionMapping();
+        if (list != null) {
+          for (int i = 0; i < list.size(); i++) {
+            partMap.setFilename(list.get(i));
+            partMap.setPartition(mapPartition.get(list.get(i)));
+            outputPort.emit(partMap);
+          }
+        }
+        mapFilenames.remove(windowId);
+        iterWindows.remove();
+      }
+      if (committedWindowId < windowId) {
+        break;
+      }
+    }
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+  }
+
+  protected void rotateCall(String lastFile)
+  {
+    try {
+      this.rotate(lastFile);
+    } catch (IOException ex) {
+      logger.debug(ex.getMessage());
+      DTThrowable.rethrow(ex);
+    } catch (ExecutionException ex) {
+      logger.debug(ex.getMessage());
+      DTThrowable.rethrow(ex);
+    }
+  }
+
+  public String getHDFSRollingLastFile()
+  {
+    Iterator<String> iterFileNames = this.openPart.keySet().iterator();
+    String lastFile = null;
+    if (iterFileNames.hasNext()) {
+      lastFile = iterFileNames.next();
+      partNumber = this.openPart.get(lastFile);
+    }
+    return getPartFileName(lastFile, partNumber.intValue());
+  }
+
+  /**
+   * This method gets a List of Hive Partitions in which the tuple needs to be
+   * written to. Example: If hive partitions are 
date='2014-12-12',country='USA'
+   * then this method returns {"2014-12-12","USA"} The implementation is left 
to
+   * the user.
+   * 
+   * @param tuple
+   *          A received tuple to be written to a hive partition.
+   * @return ArrayList containing hive partition values.
+   */
+  public abstract ArrayList<String> getHivePartition(T tuple);
+
+  @Override
+  public void endWindow()
+  {
+    if (isEmptyWindow) {
+      countEmptyWindow++;
+    }
+    if (countEmptyWindow >= maxWindowsWithNoData) {
+      String lastFile = getHDFSRollingLastFile();
+      rotateCall(lastFile);
+      countEmptyWindow = 0;
+    }
+    super.endWindow();
+
+  }
+
+  public long getMaxWindowsWithNoData()
+  {
+    return maxWindowsWithNoData;
+  }
+
+  public void setMaxWindowsWithNoData(long maxWindowsWithNoData)
+  {
+    this.maxWindowsWithNoData = maxWindowsWithNoData;
+  }
+
+  /*
+   * A POJO which is emitted by output port of AbstractFSRollingOutputOperator 
implementation in DAG.
+   * The POJO contains the filename which will not be changed by 
FSRollingOutputOperator once its emitted.
+   * The POJO also contains the hive partitions to which the respective files 
will be moved.
+   */
+  public static class FilePartitionMapping
+  {
+    private String filename;
+    private ArrayList<String> partition = new ArrayList<String>();
+
+    public ArrayList<String> getPartition()
+    {
+      return partition;
+    }
+
+    public void setPartition(ArrayList<String> partition)
+    {
+      this.partition = partition;
+    }
+
+    public String getFilename()
+    {
+      return filename;
+    }
+
+    public void setFilename(String filename)
+    {
+      this.filename = filename;
+    }
+
+  }
+
+  private static final Logger logger = 
LoggerFactory.getLogger(AbstractFSRollingOutputOperator.class);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/src/main/java/com/datatorrent/contrib/hive/FSPojoToHiveOperator.java
----------------------------------------------------------------------
diff --git 
a/hive/src/main/java/com/datatorrent/contrib/hive/FSPojoToHiveOperator.java 
b/hive/src/main/java/com/datatorrent/contrib/hive/FSPojoToHiveOperator.java
new file mode 100644
index 0000000..eaae68f
--- /dev/null
+++ b/hive/src/main/java/com/datatorrent/contrib/hive/FSPojoToHiveOperator.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.hive;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+import com.datatorrent.lib.util.PojoUtils.GetterBoolean;
+import com.datatorrent.lib.util.PojoUtils.GetterChar;
+import com.datatorrent.lib.util.PojoUtils.GetterDouble;
+import com.datatorrent.lib.util.PojoUtils.GetterFloat;
+import com.datatorrent.lib.util.PojoUtils.GetterInt;
+import com.datatorrent.lib.util.PojoUtils.GetterLong;
+import com.datatorrent.lib.util.PojoUtils.GetterShort;
+
+/**
+ * An Implementation of AbstractFSRollingOutputOperator which takes any POJO as
+ * input, serializes the POJO as Hive delimiter separated values which are
+ * written in text files to hdfs, and are inserted into hive on committed 
window
+ * callback.This operator can handle outputting to multiple files when the
+ * output file depends on the tuple.
+ *
+ * @displayName: FS To Hive Operator
+ * @category Output
+ * @tags fs, hive, database
+ * @since 3.0.0
+ */
+public class FSPojoToHiveOperator extends 
AbstractFSRollingOutputOperator<Object>
+{
+  private static final long serialVersionUID = 1L;
+  private ArrayList<String> hivePartitionColumns;
+  private ArrayList<String> hiveColumns;
+  private ArrayList<FIELD_TYPE> hiveColumnDataTypes;
+  private ArrayList<FIELD_TYPE> hivePartitionColumnDataTypes;
+  private transient ArrayList<Object> getters = Lists.newArrayList();
+  private ArrayList<String> expressionsForHiveColumns;
+  private ArrayList<String> expressionsForHivePartitionColumns;
+
+  public ArrayList<String> getExpressionsForHivePartitionColumns()
+  {
+    return expressionsForHivePartitionColumns;
+  }
+
+  public void setExpressionsForHivePartitionColumns(ArrayList<String> 
expressionsForHivePartitionColumns)
+  {
+    this.expressionsForHivePartitionColumns = 
expressionsForHivePartitionColumns;
+  }
+
+  /*
+   * A list of Java expressions in which each expression yields the specific 
table column value and partition column value in hive table from the input POJO.
+   */
+  public ArrayList<String> getExpressionsForHiveColumns()
+  {
+    return expressionsForHiveColumns;
+  }
+
+  public void setExpressionsForHiveColumns(ArrayList<String> expressions)
+  {
+    this.expressionsForHiveColumns = expressions;
+  }
+
+  @SuppressWarnings("unchecked")
+  private void getValue(Object tuple, int index, FIELD_TYPE type, 
StringBuilder value)
+  {
+    switch (type) {
+      case CHARACTER:
+        value.append(((GetterChar<Object>)getters.get(index)).get(tuple));
+        break;
+      case STRING:
+        value.append(((Getter<Object, String>)getters.get(index)).get(tuple));
+        break;
+      case BOOLEAN:
+        value.append(((GetterBoolean<Object>)getters.get(index)).get(tuple));
+        break;
+      case SHORT:
+        value.append(((GetterShort<Object>)getters.get(index)).get(tuple));
+        break;
+      case INTEGER:
+        value.append(((GetterInt<Object>)getters.get(index)).get(tuple));
+        break;
+      case LONG:
+        value.append(((GetterLong<Object>)getters.get(index)).get(tuple));
+        break;
+      case FLOAT:
+        value.append(((GetterFloat<Object>)getters.get(index)).get(tuple));
+        break;
+      case DOUBLE:
+        value.append(((GetterDouble<Object>)getters.get(index)).get(tuple));
+        break;
+      case DATE:
+        value.append(((Getter<Object, Date>)getters.get(index)).get(tuple));
+        break;
+      case TIMESTAMP:
+        value.append(((Getter<Object, 
Timestamp>)getters.get(index)).get(tuple));
+        break;
+      case OTHER:
+        value.append(((Getter<Object, Object>)getters.get(index)).get(tuple));
+        break;
+      default:
+        throw new RuntimeException("unsupported data type " + type);
+    }
+  }
+
+  public static enum FIELD_TYPE
+  {
+    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE, 
TIMESTAMP, OTHER
+  }
+
+  /*
+   * Columns in Hive table.
+   */
+  public ArrayList<String> getHiveColumns()
+  {
+    return hiveColumns;
+  }
+
+  public void setHiveColumns(ArrayList<String> hiveColumns)
+  {
+    this.hiveColumns = hiveColumns;
+  }
+
+  /*
+   * Partition Columns in Hive table.Example: dt for date,ts for timestamp
+   */
+  public ArrayList<String> getHivePartitionColumns()
+  {
+    return hivePartitionColumns;
+  }
+
+  public void setHivePartitionColumns(ArrayList<String> hivePartitionColumns)
+  {
+    this.hivePartitionColumns = hivePartitionColumns;
+  }
+
+  /*
+   * Data Types of Hive table data columns.
+   * Example: If the Hive table has two columns of data type int and float,
+   * then hiveColumnsDataTypes = {INTEGER,FLOAT}.
+   * Particular Data Type can be chosen from the List of data types provided.
+   */
+  public ArrayList<FIELD_TYPE> getHiveColumnDataTypes()
+  {
+    return hiveColumnDataTypes;
+  }
+
+  public void setHiveColumnDataTypes(ArrayList<FIELD_TYPE> hiveColumnDataTypes)
+  {
+    this.hiveColumnDataTypes = hiveColumnDataTypes;
+  }
+
+  /*
+   * Data Types of Hive table Partition Columns.
+   * Example: If the Hive table has two columns of data type int and float and 
is partitioned by date of type String,
+   * then hivePartitionColumnDataTypes = {STRING}.
+   * Particular Data Type can be chosen from the List of data types provided.
+   */
+  public ArrayList<FIELD_TYPE> getHivePartitionColumnDataTypes()
+  {
+    return hivePartitionColumnDataTypes;
+  }
+
+  public void setHivePartitionColumnDataTypes(ArrayList<FIELD_TYPE> 
hivePartitionColumnDataTypes)
+  {
+    this.hivePartitionColumnDataTypes = hivePartitionColumnDataTypes;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public ArrayList<String> getHivePartition(Object tuple)
+  {
+    if (getters.isEmpty()) {
+      processFirstTuple(tuple);
+    }
+
+    int sizeOfColumns = hiveColumns.size();
+    int sizeOfPartitionColumns = hivePartitionColumns.size();
+    //int size = sizeOfColumns + sizeOfPartitionColumns;
+    ArrayList<String> hivePartitionColumnValues = new ArrayList<String>();
+    String partitionColumnValue;
+    for (int i = 0; i < sizeOfPartitionColumns; i++) {
+      FIELD_TYPE type = hivePartitionColumnDataTypes.get(i);
+      StringBuilder result = new StringBuilder();
+      getValue(tuple, sizeOfColumns + i, type, result);
+      partitionColumnValue = result.toString();
+      //partitionColumnValue = (String)getters.get(i).get(tuple);
+      hivePartitionColumnValues.add(partitionColumnValue);
+    }
+    return hivePartitionColumnValues;
+  }
+
+  @Override
+  public void processTuple(Object tuple)
+  {
+    if (getters.isEmpty()) {
+      processFirstTuple(tuple);
+    }
+    super.processTuple(tuple);
+  }
+
+  @SuppressWarnings("unchecked")
+  public void processFirstTuple(Object tuple)
+  {
+    Class<?> fqcn = tuple.getClass();
+    createGetters(fqcn, hiveColumns.size(), expressionsForHiveColumns, 
hiveColumnDataTypes);
+    createGetters(fqcn, hivePartitionColumns.size(), 
expressionsForHivePartitionColumns, hivePartitionColumnDataTypes);
+  }
+
+  protected void createGetters(Class<?> fqcn, int size, ArrayList<String> 
expressions,
+      ArrayList<FIELD_TYPE> columnDataTypes)
+  {
+    for (int i = 0; i < size; i++) {
+      FIELD_TYPE type = columnDataTypes.get(i);
+      final Object getter;
+      final String getterExpression = expressions.get(i);
+      switch (type) {
+        case CHARACTER:
+          getter = PojoUtils.createGetterChar(fqcn, getterExpression);
+          break;
+        case STRING:
+          getter = PojoUtils.createGetter(fqcn, getterExpression, 
String.class);
+          break;
+        case BOOLEAN:
+          getter = PojoUtils.createGetterBoolean(fqcn, getterExpression);
+          break;
+        case SHORT:
+          getter = PojoUtils.createGetterShort(fqcn, getterExpression);
+          break;
+        case INTEGER:
+          getter = PojoUtils.createGetterInt(fqcn, getterExpression);
+          break;
+        case LONG:
+          getter = PojoUtils.createGetterLong(fqcn, getterExpression);
+          break;
+        case FLOAT:
+          getter = PojoUtils.createGetterFloat(fqcn, getterExpression);
+          break;
+        case DOUBLE:
+          getter = PojoUtils.createGetterDouble(fqcn, getterExpression);
+          break;
+        case DATE:
+          getter = PojoUtils.createGetter(fqcn, getterExpression, Date.class);
+          break;
+        case TIMESTAMP:
+          getter = PojoUtils.createGetter(fqcn, getterExpression, 
Timestamp.class);
+          break;
+        default:
+          getter = PojoUtils.createGetter(fqcn, getterExpression, 
Object.class);
+      }
+
+      getters.add(getter);
+
+    }
+
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected byte[] getBytesForTuple(Object tuple)
+  {
+    int size = hiveColumns.size();
+    StringBuilder result = new StringBuilder();
+    for (int i = 0; i < size; i++) {
+      FIELD_TYPE type = hiveColumnDataTypes.get(i);
+      getValue(tuple, i, type, result);
+      result.append("\t");
+    }
+    result.append("\n");
+    return (result.toString()).getBytes();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java
----------------------------------------------------------------------
diff --git a/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java 
b/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java
new file mode 100755
index 0000000..8e3b143
--- /dev/null
+++ b/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.hive;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+
+import javax.annotation.Nonnull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import 
com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator.FilePartitionMapping;
+import com.datatorrent.lib.counters.BasicCounters;
+import com.datatorrent.lib.db.AbstractStoreOutputOperator;
+
+/**
+ * Hive operator which can insert data in txt format in tables/partitions from 
a
+ * file written in hdfs location. The file contains data of the same data type
+ * as the hive tables created by user and is already committed. No changes will
+ * be made to the input file once its given to HiveOperator. This is a fault
+ * tolerant implementation of HiveOperator which assumes that load operation is
+ * an atomic operation in Hive.
+ *
+ * @category Output
+ * @tags database, sql, hive
+ *
+ * @since 2.1.0
+ */
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public class HiveOperator extends 
AbstractStoreOutputOperator<FilePartitionMapping, HiveStore>
+{
+  //This Property is user configurable.
+  protected ArrayList<String> hivePartitionColumns = new ArrayList<String>();
+  private transient String localString = "";
+
+  /**
+   * Hive store.
+   * 
+   * @deprecated use {@link AbstractStoreOutputOperator#store} instead
+   */
+  @Deprecated
+  protected HiveStore hivestore;
+
+  /**
+   * The file system used to write to.
+   */
+  protected transient FileSystem fs;
+  @Nonnull
+  protected String tablename;
+  @Nonnull
+  protected String hivepath;
+  /**
+   * This is the operator context passed at setup.
+   */
+  private transient OperatorContext context;
+
+  /**
+   * File output counters.
+   */
+  private final BasicCounters<MutableLong> fileCounters = new 
BasicCounters<MutableLong>(MutableLong.class);
+
+  /**
+   * The total time in milliseconds the operator has been running for.
+   */
+  private long totalTime;
+  /**
+   * Last time stamp collected.
+   */
+  private long lastTimeStamp;
+  /**
+   * The total number of bytes written by the operator.
+   */
+  protected long totalBytesWritten = 0;
+
+  public HiveOperator()
+  {
+    store = new HiveStore();
+    hivestore = store;
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    try {
+      fs = getHDFSInstance();
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+
+    this.context = context;
+    lastTimeStamp = System.currentTimeMillis();
+
+    fileCounters.setCounter(Counters.TOTAL_BYTES_WRITTEN, new MutableLong());
+    fileCounters.setCounter(Counters.TOTAL_TIME_ELAPSED, new MutableLong());
+    super.setup(context);
+  }
+
+  /**
+   * Override this method to change the FileSystem instance that is used by the
+   * operator.
+   *
+   * @return A FileSystem object.
+   * @throws IOException
+   */
+  protected FileSystem getHDFSInstance() throws IOException
+  {
+    FileSystem tempFS = FileSystem.newInstance(new 
Path(store.filepath).toUri(), new Configuration());
+    if (!tempFS.getScheme().equalsIgnoreCase("hdfs")) {
+      localString = " local";
+    }
+    return tempFS;
+  }
+
+  /**
+   * Function to process each incoming tuple. The input is FilePartitionMapping
+   * which is a POJO containing filename which is already committed and will 
not
+   * be changed.The POJO also contains the hive partitions to which the
+   * respective files will be moved.
+   *
+   * @param tuple
+   *          incoming tuple which has filename and hive partition.
+   */
+  @Override
+  public void processTuple(FilePartitionMapping tuple)
+  {
+    String command = processHiveFile(tuple);
+    logger.debug("commands is {}", command);
+    if (command != null) {
+      Statement stmt;
+      try {
+        stmt = store.getConnection().createStatement();
+        stmt.execute(command);
+      } catch (SQLException ex) {
+        throw new RuntimeException("Moving file into hive failed" + ex);
+      }
+    }
+
+  }
+
+  /*
+   * This function extracts the filename and partitions to which the file will 
be loaded.
+   * It returns the command to be executed by hive.
+   */
+  private String processHiveFile(FilePartitionMapping tuple)
+  {
+    String filename = tuple.getFilename();
+    ArrayList<String> partition = tuple.getPartition();
+    String command = null;
+    String filepath = store.getFilepath() + Path.SEPARATOR + filename;
+    logger.debug("processing {} filepath", filepath);
+    int numPartitions = partition.size();
+    try {
+      if (fs.exists(new Path(filepath))) {
+        if (numPartitions > 0) {
+          StringBuilder partitionString = new StringBuilder(
+              hivePartitionColumns.get(0) + "='" + partition.get(0) + "'");
+          int i = 0;
+          while (i < numPartitions) {
+            i++;
+            if (i == numPartitions) {
+              break;
+            }
+            
partitionString.append(",").append(hivePartitionColumns.get(i)).append("='").append(partition.get(i))
+                .append("'");
+          }
+          if (i < hivePartitionColumns.size()) {
+            partitionString.append(",").append(hivePartitionColumns.get(i));
+          }
+          command = "load data" + localString + " inpath '" + filepath + "' 
into table " + tablename + " PARTITION"
+              + "( " + partitionString + " )";
+        } else {
+          command = "load data" + localString + " inpath '" + filepath + "' 
into table " + tablename;
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    logger.debug("command is {}", command);
+    return command;
+  }
+
+  @Override
+  public void endWindow()
+  {
+    long currentTimeStamp = System.currentTimeMillis();
+    totalTime += currentTimeStamp - lastTimeStamp;
+    lastTimeStamp = currentTimeStamp;
+
+    fileCounters.getCounter(Counters.TOTAL_TIME_ELAPSED).setValue(totalTime);
+    
fileCounters.getCounter(Counters.TOTAL_BYTES_WRITTEN).setValue(totalBytesWritten);
+    context.setCounters(fileCounters);
+  }
+
+  public void teardown()
+  {
+    long currentTimeStamp = System.currentTimeMillis();
+    totalTime += currentTimeStamp - lastTimeStamp;
+    lastTimeStamp = currentTimeStamp;
+  }
+
+  /**
+   * Get the partition columns in hive to which data needs to be loaded.
+   * 
+   * @return List of Hive Partition Columns
+   */
+  public ArrayList<String> getHivePartitionColumns()
+  {
+    return hivePartitionColumns;
+  }
+
+  /**
+   * Set the hive partition columns to which data needs to be loaded.
+   * 
+   * @param hivePartitionColumns
+   */
+  public void setHivePartitionColumns(ArrayList<String> hivePartitionColumns)
+  {
+    this.hivePartitionColumns = hivePartitionColumns;
+  }
+
+  /**
+   * Get the table name in hive.
+   * 
+   * @return table name
+   */
+  public String getTablename()
+  {
+    return tablename;
+  }
+
+  /**
+   * Set the table name in hive.
+   * 
+   * @param tablename
+   */
+  public void setTablename(String tablename)
+  {
+    this.tablename = tablename;
+  }
+
+  /**
+   * Gets the store set for hive;
+   * 
+   * @deprecated use {@link #getStore()} instead.
+   * @return hive store
+   */
+  @Deprecated
+  public HiveStore getHivestore()
+  {
+    return store;
+  }
+
+  /**
+   * Set the store in hive.
+   * 
+   * @deprecated use {@link #setStore()} instead.
+   * @param hivestore
+   */
+  @Deprecated
+  public void setHivestore(HiveStore hivestore)
+  {
+    this.hivestore = hivestore;
+    super.setStore(hivestore);
+  }
+
+  public static enum Counters
+  {
+    /**
+     * An enum for counters representing the total number of bytes written by
+     * the operator.
+     */
+    TOTAL_BYTES_WRITTEN,
+
+    /**
+     * An enum for counters representing the total time the operator has been
+     * operational for.
+     */
+    TOTAL_TIME_ELAPSED
+  }
+
+  private static final Logger logger = 
LoggerFactory.getLogger(HiveOperator.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/src/main/java/com/datatorrent/contrib/hive/HiveStore.java
----------------------------------------------------------------------
diff --git a/hive/src/main/java/com/datatorrent/contrib/hive/HiveStore.java 
b/hive/src/main/java/com/datatorrent/contrib/hive/HiveStore.java
new file mode 100755
index 0000000..362a824
--- /dev/null
+++ b/hive/src/main/java/com/datatorrent/contrib/hive/HiveStore.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.hive;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.lib.db.jdbc.JdbcStore;
+
+/**
+ * Hive Store that extends Jdbc Store and provides its own driver name.
+ *
+ * @since 2.1.0
+ */
+public class HiveStore extends JdbcStore
+{
+  public HiveStore()
+  {
+    super();
+    this.setDatabaseDriver(HIVE_DRIVER);
+  }
+
+  public static final String HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver";
+  @NotNull
+  public String filepath;
+
+  public String getFilepath()
+  {
+    return filepath;
+  }
+
+  public void setFilepath(String filepath)
+  {
+    this.filepath = filepath;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/src/main/java/com/datatorrent/contrib/hive/HiveStreamCodec.java
----------------------------------------------------------------------
diff --git 
a/hive/src/main/java/com/datatorrent/contrib/hive/HiveStreamCodec.java 
b/hive/src/main/java/com/datatorrent/contrib/hive/HiveStreamCodec.java
new file mode 100755
index 0000000..becab2c
--- /dev/null
+++ b/hive/src/main/java/com/datatorrent/contrib/hive/HiveStreamCodec.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.hive;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+
+/**
+ * Stream codec for uniform distribution of tuples on upstream operator. This 
is
+ * used to make sure that data being sent to a particular hive partition goes 
to
+ * a specific operator partition by passing FSRollingOutputOperator to the
+ * stream codec.
+ *
+ * @since 2.1.0
+ */
+public class HiveStreamCodec<T> extends KryoSerializableStreamCodec<T> 
implements Externalizable
+{
+  private static final long serialVersionUID = 201412121604L;
+
+  protected AbstractFSRollingOutputOperator<T> rollingOperator;
+
+  @Override
+  public void writeExternal(ObjectOutput out) throws IOException
+  {
+
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    ObjectOutputStream obj = new ObjectOutputStream(os);
+    Output output = new Output(obj);
+    kryo.writeClassAndObject(output, rollingOperator);
+    byte[] outBytes = output.toBytes();
+    out.writeInt(outBytes.length);
+    out.write(outBytes, 0, outBytes.length);
+    out.flush();
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException
+  {
+    int size = in.readInt();
+    byte[] data = new byte[size];
+    in.readFully(data);
+    Input input = new Input(data);
+    input.setBuffer(data);
+    rollingOperator = 
(AbstractFSRollingOutputOperator)kryo.readClassAndObject(input);
+  }
+
+  @Override
+  public int getPartition(T o)
+  {
+    return rollingOperator.getHivePartition(o).hashCode();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/src/main/java/com/datatorrent/contrib/hive/package-info.java
----------------------------------------------------------------------
diff --git a/hive/src/main/java/com/datatorrent/contrib/hive/package-info.java 
b/hive/src/main/java/com/datatorrent/contrib/hive/package-info.java
new file mode 100644
index 0000000..ad93d8f
--- /dev/null
+++ b/hive/src/main/java/com/datatorrent/contrib/hive/package-info.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
[email protected]
+package com.datatorrent.contrib.hive;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java
----------------------------------------------------------------------
diff --git 
a/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java 
b/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java
new file mode 100644
index 0000000..4f58cf4
--- /dev/null
+++ b/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java
@@ -0,0 +1,482 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.hive;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import javax.annotation.Nonnull;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.beanutils.ConvertUtils;
+import org.apache.commons.beanutils.converters.AbstractConverter;
+import org.apache.commons.beanutils.converters.ArrayConverter;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.contrib.hive.FSPojoToHiveOperator;
+import com.datatorrent.contrib.hive.FSPojoToHiveOperator.FIELD_TYPE;
+import com.datatorrent.contrib.hive.HiveOperator;
+import com.datatorrent.contrib.hive.HiveStore;
+
+/**
+ * HiveOutputModule provides abstraction for the operators needed for writing
+ * tuples to hive. This module will be expanded to FSPojoToHiveOperator and
+ * HiveOperator in physical plan.
+ */
[email protected]
+public class HiveOutputModule implements Module
+{
+
+  /**
+   * The path of the directory to where files are written.
+   */
+  @NotNull
+  private String filePath;
+
+  /**
+   * Names of the columns in hive table (excluding partitioning columns).
+   */
+  private String[] hiveColumns;
+
+  /**
+   * Data types of the columns in hive table (excluding partitioning columns).
+   * This sequence should match to the fields in hiveColumnDataTypes
+   */
+  private FIELD_TYPE[] hiveColumnDataTypes;
+
+  /**
+   * Expressions for the hive columns (excluding partitioning columns). This
+   * sequence should match to the fields in hiveColumnDataTypes
+   */
+  private String[] expressionsForHiveColumns;
+
+  /**
+   * Names of the columns on which hive data should be partitioned
+   */
+  private String[] hivePartitionColumns;
+
+  /**
+   * Data types of the columns on which hive data should be partitioned. This
+   * sequence should match to the fields in hivePartitionColumns
+   */
+  private FIELD_TYPE[] hivePartitionColumnDataTypes;
+
+  /**
+   * Expressions for the hive partition columns. This sequence should match to
+   * the fields in hivePartitionColumns
+   */
+  private String[] expressionsForHivePartitionColumns;
+
+  /**
+   * The maximum length in bytes of a rolling file. Default value is 128MB.
+   */
+  @Min(1)
+  protected Long maxLength = 134217728L;
+
+  /**
+   * Connection URL for connecting to hive.
+   */
+  @NotNull
+  private String databaseUrl;
+
+  /**
+   * Driver for connecting to hive.
+   */
+  @NotNull
+  private String databaseDriver;
+
+  /**
+   * Username for connecting to hive
+   */
+  private String userName;
+
+  /**
+   * Password for connecting to hive
+   */
+  private String password;
+
+  /**
+   * Table name for writing data into hive
+   */
+  @Nonnull
+  protected String tablename;
+
+  /**
+   * Input port for files metadata.
+   */
+  public final transient ProxyInputPort<Object> input = new 
ProxyInputPort<Object>();
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    FSPojoToHiveOperator fsRolling = dag.addOperator("fsRolling", new 
FSPojoToHiveOperator());
+    HiveOperator hiveOperator = dag.addOperator("HiveOperator", new 
HiveOperator());
+
+    input.set(fsRolling.input);
+    dag.addStream("toHive", fsRolling.outputPort, hiveOperator.input);
+    fsRolling.setFilePath(filePath);
+    fsRolling.setHiveColumns(new 
ArrayList<String>(Arrays.asList(hiveColumns)));
+    fsRolling.setHiveColumnDataTypes(new 
ArrayList<FIELD_TYPE>(Arrays.asList(hiveColumnDataTypes)));
+    fsRolling.setExpressionsForHiveColumns(new 
ArrayList<String>(Arrays.asList(expressionsForHiveColumns)));
+
+    fsRolling.setHivePartitionColumns(new 
ArrayList<String>(Arrays.asList(hivePartitionColumns)));
+    fsRolling.setHivePartitionColumnDataTypes(new 
ArrayList<FIELD_TYPE>(Arrays.asList(hivePartitionColumnDataTypes)));
+    fsRolling.setExpressionsForHivePartitionColumns(
+        new 
ArrayList<String>(Arrays.asList(expressionsForHivePartitionColumns)));
+
+    fsRolling.setMaxLength(maxLength);
+    fsRolling.setAlwaysWriteToTmp(true);
+    fsRolling.setRotationWindows(0);
+
+    hiveOperator.setHivePartitionColumns(new 
ArrayList<String>(Arrays.asList(hivePartitionColumns)));
+    HiveStore hiveStore = hiveOperator.getStore();
+    hiveStore.setFilepath(filePath);
+    hiveStore.setDatabaseUrl(databaseUrl);
+    hiveStore.setDatabaseDriver(databaseDriver);
+    hiveStore.getConnectionProperties().put("user", userName);
+    if (password != null) {
+      hiveStore.getConnectionProperties().put("password", password);
+    }
+    hiveOperator.setTablename(tablename);
+  }
+
+  /**
+   * The path of the directory to where files are written.
+   * 
+   * @return file path
+   */
+  public String getFilePath()
+  {
+    return filePath;
+  }
+
+  /**
+   * The path of the directory to where files are written.
+   * 
+   * @param filePath
+   *          file path
+   */
+  public void setFilePath(String filePath)
+  {
+    this.filePath = filePath;
+  }
+
+  /**
+   * Names of the columns in hive table (excluding partitioning columns).
+   * 
+   * @return Hive column names
+   */
+  public String[] getHiveColumns()
+  {
+    return hiveColumns;
+  }
+
+  /**
+   * Names of the columns in hive table (excluding partitioning columns).
+   * 
+   * @param hiveColumns
+   *          Hive column names
+   */
+  public void setHiveColumns(String[] hiveColumns)
+  {
+    this.hiveColumns = hiveColumns;
+  }
+
+  /**
+   * Data types of the columns in hive table (excluding partitioning columns).
+   * This sequence should match to the fields in hiveColumnDataTypes
+   * 
+   * @return Hive column data types
+   */
+  public FIELD_TYPE[] getHiveColumnDataTypes()
+  {
+    return hiveColumnDataTypes;
+  }
+
+  /**
+   * Data types of the columns in hive table (excluding partitioning columns).
+   * This sequence should match to the fields in hiveColumnDataTypes *
+   * 
+   * @param hiveColumnDataTypes
+   *          Hive column data types
+   */
+  public void setHiveColumnDataTypes(FIELD_TYPE[] hiveColumnDataTypes)
+  {
+    this.hiveColumnDataTypes = hiveColumnDataTypes;
+  }
+
+  /**
+   * Expressions for the hive columns (excluding partitioning columns). This
+   * sequence should match to the fields in hiveColumnDataTypes
+   * 
+   * @return
+   */
+  public String[] getExpressionsForHiveColumns()
+  {
+    return expressionsForHiveColumns;
+  }
+
+  /**
+   * Expressions for the hive columns (excluding partitioning columns). This
+   * sequence should match to the fields in hiveColumnDataTypes
+   * 
+   * @param expressionsForHiveColumns
+   */
+  public void setExpressionsForHiveColumns(String[] expressionsForHiveColumns)
+  {
+    this.expressionsForHiveColumns = expressionsForHiveColumns;
+  }
+
+  /**
+   * Names of the columns on which hive data should be partitioned
+   * 
+   * @return hive partition columns
+   */
+  public String[] getHivePartitionColumns()
+  {
+    return hivePartitionColumns;
+  }
+
+  /**
+   * Names of the columns on which hive data should be partitioned
+   * 
+   * @param hivePartitionColumns
+   *          Hive partition columns
+   */
+  public void setHivePartitionColumns(String[] hivePartitionColumns)
+  {
+    this.hivePartitionColumns = hivePartitionColumns;
+  }
+
+  /**
+   * Data types of the columns on which hive data should be partitioned. This
+   * sequence should match to the fields in hivePartitionColumns
+   * 
+   * @return Hive partition column data types
+   */
+  public FIELD_TYPE[] getHivePartitionColumnDataTypes()
+  {
+    return hivePartitionColumnDataTypes;
+  }
+
+  /**
+   * Data types of the columns on which hive data should be partitioned. This
+   * sequence should match to the fields in hivePartitionColumns
+   * 
+   * @param hivePartitionColumnDataTypes
+   *          Hive partition column data types
+   */
+  public void setHivePartitionColumnDataTypes(FIELD_TYPE[] 
hivePartitionColumnDataTypes)
+  {
+    this.hivePartitionColumnDataTypes = hivePartitionColumnDataTypes;
+  }
+
+  /**
+   * Expressions for the hive partition columns. This sequence should match to
+   * the fields in hivePartitionColumns
+   * 
+   * @return Expressions for hive partition columns
+   */
+  public String[] getExpressionsForHivePartitionColumns()
+  {
+    return expressionsForHivePartitionColumns;
+  }
+
+  /**
+   * Expressions for the hive partition columns. This sequence should match to
+   * the fields in hivePartitionColumns
+   * 
+   * @param expressionsForHivePartitionColumns
+   *          Expressions for hive partition columns
+   */
+  public void setExpressionsForHivePartitionColumns(String[] 
expressionsForHivePartitionColumns)
+  {
+    this.expressionsForHivePartitionColumns = 
expressionsForHivePartitionColumns;
+  }
+
+  /**
+   * The maximum length in bytes of a rolling file.
+   * 
+   * @return maximum size of file
+   */
+  public Long getMaxLength()
+  {
+    return maxLength;
+  }
+
+  /**
+   * The maximum length in bytes of a rolling file.
+   * 
+   * @param maxLength
+   *          maximum size of file
+   */
+  public void setMaxLength(Long maxLength)
+  {
+    this.maxLength = maxLength;
+  }
+
+  /**
+   * Connection URL for connecting to hive.
+   * 
+   * @return database url
+   */
+  public String getDatabaseUrl()
+  {
+    return databaseUrl;
+  }
+
+  /**
+   * Connection URL for connecting to hive.
+   * 
+   * @param databaseUrl
+   *          database url
+   */
+  public void setDatabaseUrl(String databaseUrl)
+  {
+    this.databaseUrl = databaseUrl;
+  }
+
+  /**
+   * Driver for connecting to hive.
+   * 
+   * @return database driver
+   */
+  public String getDatabaseDriver()
+  {
+    return databaseDriver;
+  }
+
+  /**
+   * Driver for connecting to hive.
+   * 
+   * @param databaseDriver
+   *          database driver
+   */
+  public void setDatabaseDriver(String databaseDriver)
+  {
+    this.databaseDriver = databaseDriver;
+  }
+
+  /**
+   * Username for connecting to hive
+   * 
+   * @return user name
+   */
+  public String getUserName()
+  {
+    return userName;
+  }
+
+  /**
+   * Username for connecting to hive
+   * 
+   * @param username
+   *          user name
+   */
+  public void setUserName(String userName)
+  {
+    this.userName = userName;
+  }
+
+  /**
+   * Password for connecting to hive
+   * 
+   * @return password
+   */
+  public String getPassword()
+  {
+    return password;
+  }
+
+  /**
+   * Password for connecting to hive
+   * 
+   * @param password
+   *          password
+   */
+  public void setPassword(String password)
+  {
+    this.password = password;
+  }
+
+  /**
+   * Table name for writing data into hive
+   * 
+   * @return table name
+   */
+  public String getTablename()
+  {
+    return tablename;
+  }
+
+  /**
+   * Table name for writing data into hive
+   * 
+   * @param tablename
+   *          table name
+   */
+  public void setTablename(String tablename)
+  {
+    this.tablename = tablename;
+  }
+
+  static {
+    //Code for enabling BeanUtils to accept comma separated string to 
initialize FIELD_TYPE[]
+    class FieldTypeConvertor extends AbstractConverter
+    {
+
+      @Override
+      protected Object convertToType(Class type, Object value) throws Throwable
+      {
+        if (value instanceof String) {
+
+          return FIELD_TYPE.valueOf((String)value);
+        } else {
+          throw new IllegalArgumentException("FIELD_TYPE should be specified 
as String");
+        }
+      }
+
+      @Override
+      protected Class getDefaultType()
+      {
+        return FIELD_TYPE.class;
+      }
+    }
+
+    class FieldTypeArrayConvertor extends ArrayConverter
+    {
+
+      public FieldTypeArrayConvertor()
+      {
+        super(FIELD_TYPE[].class, new FieldTypeConvertor());
+      }
+    }
+
+    ConvertUtils.register(new FieldTypeConvertor(), FIELD_TYPE.class);
+    ConvertUtils.register(new FieldTypeConvertor(), FIELD_TYPE.class);
+    ConvertUtils.register(new FieldTypeArrayConvertor(), FIELD_TYPE[].class);
+
+    ConvertUtils.lookup(FIELD_TYPE.class).getClass();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/src/test/java/com/datatorrent/contrib/hive/FSRollingMapTestImpl.java
----------------------------------------------------------------------
diff --git 
a/hive/src/test/java/com/datatorrent/contrib/hive/FSRollingMapTestImpl.java 
b/hive/src/test/java/com/datatorrent/contrib/hive/FSRollingMapTestImpl.java
new file mode 100755
index 0000000..27b727f
--- /dev/null
+++ b/hive/src/test/java/com/datatorrent/contrib/hive/FSRollingMapTestImpl.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.hive;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+public class FSRollingMapTestImpl extends 
AbstractFSRollingOutputOperator<Map<String, Object>>
+{
+  @Override
+  public ArrayList<String> getHivePartition(Map<String, Object> tuple)
+  {
+    ArrayList<String> hivePartitions = new ArrayList<String>();
+    hivePartitions.add("2014-12-10");
+    return (hivePartitions);
+  }
+
+  @Override
+  protected byte[] getBytesForTuple(Map<String, Object> tuple)
+  {
+    Iterator<String> keyIter = tuple.keySet().iterator();
+    StringBuilder writeToHive = new StringBuilder("");
+
+    while (keyIter.hasNext()) {
+      String key = keyIter.next();
+      Object obj = tuple.get(key);
+      writeToHive.append(key).append(":").append(obj).append("\n");
+    }
+    return writeToHive.toString().getBytes();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/src/test/java/com/datatorrent/contrib/hive/FSRollingTestImpl.java
----------------------------------------------------------------------
diff --git 
a/hive/src/test/java/com/datatorrent/contrib/hive/FSRollingTestImpl.java 
b/hive/src/test/java/com/datatorrent/contrib/hive/FSRollingTestImpl.java
new file mode 100755
index 0000000..f3bade6
--- /dev/null
+++ b/hive/src/test/java/com/datatorrent/contrib/hive/FSRollingTestImpl.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.hive;
+
+import java.util.ArrayList;
+
+public class FSRollingTestImpl extends AbstractFSRollingOutputOperator<String>
+{
+  @Override
+  public ArrayList<String> getHivePartition(String tuple)
+  {
+    ArrayList<String> hivePartitions = new ArrayList<String>();
+    hivePartitions.add(tuple);
+    return (hivePartitions);
+  }
+
+  @Override
+  protected byte[] getBytesForTuple(String tuple)
+  {
+    return (tuple + "\n").getBytes();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java
----------------------------------------------------------------------
diff --git a/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java 
b/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java
new file mode 100755
index 0000000..3c64bdf
--- /dev/null
+++ b/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java
@@ -0,0 +1,618 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.hive;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.thrift.TException;
+
+import io.teknek.hiveunit.HiveTestService;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+
+import com.datatorrent.api.Attribute.AttributeMap;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator.ProcessingMode;
+import 
com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator.FilePartitionMapping;
+import com.datatorrent.contrib.hive.FSPojoToHiveOperator.FIELD_TYPE;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+
+
+public class HiveMockTest extends HiveTestService
+{
+  public static final String APP_ID = "HiveOperatorTest";
+
+  public static final int OPERATOR_ID = 0;
+  public static final int NUM_WINDOWS = 10;
+  public static final int BLAST_SIZE = 10;
+  public static final int DATABASE_SIZE = NUM_WINDOWS * BLAST_SIZE;
+  public static final String tablename = "temp";
+  public static final String tablepojo = "temppojo";
+  public static final String tablemap = "tempmap";
+  public static String delimiterMap = ":";
+  public static final String HOST = "localhost";
+  public static final String PORT = "10000";
+  public static final String DATABASE = "default";
+  public static final String HOST_PREFIX = "jdbc:hive://";
+
+  public String testdir;
+
+  private String getDir()
+  {
+    String filePath = new File("target/hive").getAbsolutePath();
+    LOG.info("filepath is {}", filePath);
+    return filePath;
+  }
+
+  @Override
+  public void setUp() throws Exception
+  {
+    super.setUp();
+  }
+
+  private static final int NUMBER_OF_TESTS = 4; // count your tests here
+  private int sTestsRun = 0;
+
+  @Override
+  public void tearDown() throws Exception
+  {
+    super.tearDown();
+    sTestsRun++;
+    //Equivalent of @AfterClass in junit 3.
+    if (sTestsRun == NUMBER_OF_TESTS) {
+      FileUtils.deleteQuietly(new File(testdir));
+    }
+  }
+
+  public HiveMockTest() throws IOException, Exception
+  {
+    super();
+    testdir = getDir();
+    Properties properties = System.getProperties();
+    properties.put("derby.system.home", testdir);
+    System.setProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.toString(), 
testdir);
+    new File(testdir).mkdir();
+  }
+
+  public static HiveStore createStore(HiveStore hiveStore)
+  {
+    String host = HOST;
+    String port = PORT;
+
+    if (hiveStore == null) {
+      hiveStore = new HiveStore();
+    }
+
+    StringBuilder sb = new StringBuilder();
+    String tempHost = HOST_PREFIX + host + ":" + port;
+
+    LOG.debug("Host name: {}", tempHost);
+    LOG.debug("Port: {}", 10000);
+
+    sb.append("user:").append("").append(",");
+    sb.append("password:").append("");
+
+    String properties = sb.toString();
+    LOG.debug(properties);
+    hiveStore.setDatabaseDriver("org.apache.hive.jdbc.HiveDriver");
+
+    hiveStore.setDatabaseUrl("jdbc:hive2://");
+    hiveStore.setConnectionProperties(properties);
+    return hiveStore;
+  }
+
+  public static void hiveInitializeDatabase(HiveStore hiveStore) throws 
SQLException
+  {
+    hiveStore.connect();
+    Statement stmt = hiveStore.getConnection().createStatement();
+    // show tables
+    String sql = "show tables";
+
+    LOG.debug(sql);
+    ResultSet res = stmt.executeQuery(sql);
+    if (res.next()) {
+      LOG.debug("tables are {}", res.getString(1));
+    }
+
+    stmt.execute("DROP TABLE " + tablename);
+
+    stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename + " (col1 String) 
PARTITIONED BY(dt STRING) "
+        + "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY 
'\n'  \n" + "STORED AS TEXTFILE ");
+    /*ResultSet res = stmt.execute("CREATE TABLE IF NOT EXISTS temp4 (col1 
map<string,int>,col2 map<string,int>,col3  map<string,int>,col4 
map<String,timestamp>, col5 map<string,double>,col6 map<string,double>,col7 
map<string,int>,col8 map<string,int>) ROW FORMAT DELIMITED FIELDS TERMINATED BY 
','  \n"
+     + "COLLECTION ITEMS TERMINATED BY '\n'  \n"
+     + "MAP KEYS TERMINATED BY ':'  \n"
+     + "LINES TERMINATED BY '\n' "
+     + "STORED AS TEXTFILE");*/
+
+    hiveStore.disconnect();
+  }
+
+  public static void hiveInitializePOJODatabase(HiveStore hiveStore) throws 
SQLException
+  {
+    hiveStore.connect();
+    Statement stmt = hiveStore.getConnection().createStatement();
+    // show tables
+    String sql = "show tables";
+
+    LOG.debug(sql);
+    ResultSet res = stmt.executeQuery(sql);
+    if (res.next()) {
+      LOG.debug("tables are {}", res.getString(1));
+    }
+    stmt.execute("DROP TABLE " + tablepojo);
+
+    stmt.execute("CREATE TABLE " + tablepojo + " (col1 int) PARTITIONED BY(dt 
STRING) ROW FORMAT DELIMITED "
+        + "FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' \n" + "STORED AS 
TEXTFILE ");
+    hiveStore.disconnect();
+  }
+
+  public static void hiveInitializeMapDatabase(HiveStore hiveStore) throws 
SQLException
+  {
+    hiveStore.connect();
+    Statement stmt = hiveStore.getConnection().createStatement();
+    // show tables
+    String sql = "show tables";
+
+    LOG.debug(sql);
+    ResultSet res = stmt.executeQuery(sql);
+    if (res.next()) {
+      LOG.debug(res.getString(1));
+    }
+
+    stmt.execute("DROP TABLE " + tablemap);
+
+    stmt.execute("CREATE TABLE IF NOT EXISTS " + tablemap
+        + " (col1 map<string,int>) PARTITIONED BY(dt STRING) ROW FORMAT 
DELIMITED FIELDS TERMINATED BY '\t'  \n"
+        + "MAP KEYS TERMINATED BY '" + delimiterMap + "' \n" + "STORED AS 
TEXTFILE ");
+    hiveStore.disconnect();
+  }
+
+  @Test
+  public void testInsertString() throws Exception
+  {
+    HiveStore hiveStore = createStore(null);
+    hiveStore.setFilepath(testdir);
+    ArrayList<String> hivePartitionColumns = new ArrayList<String>();
+    hivePartitionColumns.add("dt");
+    hiveInitializeDatabase(createStore(null));
+    HiveOperator hiveOperator = new HiveOperator();
+    hiveOperator.setStore(hiveStore);
+    hiveOperator.setTablename(tablename);
+    hiveOperator.setHivePartitionColumns(hivePartitionColumns);
+
+    FSRollingTestImpl fsRolling = new FSRollingTestImpl();
+    fsRolling.setFilePath(testdir);
+    short permission = 511;
+    fsRolling.setFilePermission(permission);
+    fsRolling.setAlwaysWriteToTmp(false);
+    fsRolling.setMaxLength(128);
+    AttributeMap.DefaultAttributeMap attributeMap = new 
AttributeMap.DefaultAttributeMap();
+    attributeMap.put(OperatorContext.PROCESSING_MODE, 
ProcessingMode.AT_LEAST_ONCE);
+    attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, -1L);
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(
+        OPERATOR_ID, attributeMap);
+
+    fsRolling.setup(context);
+    hiveOperator.setup(context);
+    FilePartitionMapping mapping1 = new FilePartitionMapping();
+    FilePartitionMapping mapping2 = new FilePartitionMapping();
+    mapping1.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-10" + "/" 
+ "0-transaction.out.part.0");
+    ArrayList<String> partitions1 = new ArrayList<String>();
+    partitions1.add("2014-12-10");
+    mapping1.setPartition(partitions1);
+    ArrayList<String> partitions2 = new ArrayList<String>();
+    partitions2.add("2014-12-11");
+    mapping2.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-11" + "/" 
+ "0-transaction.out.part.0");
+    mapping2.setPartition(partitions2);
+    for (int wid = 0, total = 0; wid < NUM_WINDOWS; wid++) {
+      fsRolling.beginWindow(wid);
+      for (int tupleCounter = 0; tupleCounter < BLAST_SIZE && total < 
DATABASE_SIZE; tupleCounter++, total++) {
+        fsRolling.input.process("2014-12-1" + tupleCounter);
+      }
+      if (wid == 7) {
+        fsRolling.committed(wid - 1);
+        hiveOperator.processTuple(mapping1);
+        hiveOperator.processTuple(mapping2);
+      }
+
+      fsRolling.endWindow();
+    }
+
+    fsRolling.teardown();
+    hiveStore.connect();
+    client.execute("select * from " + tablename + " where dt='2014-12-10'");
+    List<String> recordsInDatePartition1 = client.fetchAll();
+
+    client.execute("select * from " + tablename + " where dt='2014-12-11'");
+    List<String> recordsInDatePartition2 = client.fetchAll();
+    client.execute("drop table " + tablename);
+    hiveStore.disconnect();
+
+    Assert.assertEquals(7, recordsInDatePartition1.size());
+    for (int i = 0; i < recordsInDatePartition1.size(); i++) {
+      LOG.debug("records in first date partition are {}", 
recordsInDatePartition1.get(i));
+      /*An array containing partition and data is returned as a string record, 
hence we need to upcast it to an object first
+       and then downcast to a string in order to use in Assert.*/
+      Object record = recordsInDatePartition1.get(i);
+      Object[] records = (Object[])record;
+      Assert.assertEquals("2014-12-10", records[1]);
+    }
+    Assert.assertEquals(7, recordsInDatePartition2.size());
+    for (int i = 0; i < recordsInDatePartition2.size(); i++) {
+      LOG.debug("records in second date partition are {}", 
recordsInDatePartition2.get(i));
+      Object record = recordsInDatePartition2.get(i);
+      Object[] records = (Object[])record;
+      Assert.assertEquals("2014-12-11", records[1]);
+    }
+  }
+
+  @Test
+  public void testInsertPOJO() throws Exception
+  {
+    HiveStore hiveStore = createStore(null);
+    hiveStore.setFilepath(testdir);
+    ArrayList<String> hivePartitionColumns = new ArrayList<String>();
+    hivePartitionColumns.add("dt");
+    ArrayList<String> hiveColumns = new ArrayList<String>();
+    hiveColumns.add("col1");
+    hiveInitializePOJODatabase(createStore(null));
+    HiveOperator hiveOperator = new HiveOperator();
+    hiveOperator.setStore(hiveStore);
+    hiveOperator.setTablename(tablepojo);
+    hiveOperator.setHivePartitionColumns(hivePartitionColumns);
+
+    FSPojoToHiveOperator fsRolling = new FSPojoToHiveOperator();
+    fsRolling.setFilePath(testdir);
+    fsRolling.setHiveColumns(hiveColumns);
+    ArrayList<FIELD_TYPE> fieldtypes = new ArrayList<FIELD_TYPE>();
+    ArrayList<FIELD_TYPE> partitiontypes = new ArrayList<FIELD_TYPE>();
+    fieldtypes.add(FIELD_TYPE.INTEGER);
+    partitiontypes.add(FIELD_TYPE.STRING);
+    fsRolling.setHiveColumnDataTypes(fieldtypes);
+    fsRolling.setHivePartitionColumnDataTypes(partitiontypes);
+    //ArrayList<FIELD_TYPE> partitionColumnType = new ArrayList<FIELD_TYPE>();
+    //partitionColumnType.add(FIELD_TYPE.STRING);
+    fsRolling.setHivePartitionColumns(hivePartitionColumns);
+    // fsRolling.setHivePartitionColumnsDataTypes(partitionColumnType);
+    ArrayList<String> expressions = new ArrayList<String>();
+    expressions.add("getId()");
+    ArrayList<String> expressionsPartitions = new ArrayList<String>();
+
+    expressionsPartitions.add("getDate()");
+    short permission = 511;
+    fsRolling.setFilePermission(permission);
+    fsRolling.setAlwaysWriteToTmp(false);
+    fsRolling.setMaxLength(128);
+    fsRolling.setExpressionsForHiveColumns(expressions);
+    fsRolling.setExpressionsForHivePartitionColumns(expressionsPartitions);
+    AttributeMap.DefaultAttributeMap attributeMap = new 
AttributeMap.DefaultAttributeMap();
+    attributeMap.put(OperatorContext.PROCESSING_MODE, 
ProcessingMode.AT_LEAST_ONCE);
+    attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, -1L);
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(
+        OPERATOR_ID, attributeMap);
+
+    fsRolling.setup(context);
+    hiveOperator.setup(context);
+    FilePartitionMapping mapping1 = new FilePartitionMapping();
+    FilePartitionMapping mapping2 = new FilePartitionMapping();
+    mapping1.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-11" + "/" 
+ "0-transaction.out.part.0");
+    ArrayList<String> partitions1 = new ArrayList<String>();
+    partitions1.add("2014-12-11");
+    mapping1.setPartition(partitions1);
+    ArrayList<String> partitions2 = new ArrayList<String>();
+    partitions2.add("2014-12-12");
+    mapping2.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-12" + "/" 
+ "0-transaction.out.part.0");
+    mapping2.setPartition(partitions2);
+    for (int wid = 0, total = 0; wid < NUM_WINDOWS; wid++) {
+      fsRolling.beginWindow(wid);
+      for (int tupleCounter = 1; tupleCounter < BLAST_SIZE && total < 
DATABASE_SIZE; tupleCounter++, total++) {
+        InnerObj innerObj = new InnerObj();
+        innerObj.setId(tupleCounter);
+        innerObj.setDate("2014-12-1" + tupleCounter);
+        fsRolling.input.process(innerObj);
+      }
+      if (wid == 7) {
+        fsRolling.committed(wid - 1);
+        hiveOperator.processTuple(mapping1);
+        hiveOperator.processTuple(mapping2);
+      }
+
+      fsRolling.endWindow();
+    }
+
+    fsRolling.teardown();
+    hiveStore.connect();
+    client.execute("select * from " + tablepojo + " where dt='2014-12-11'");
+    List<String> recordsInDatePartition1 = client.fetchAll();
+
+    client.execute("select * from " + tablepojo + " where dt='2014-12-12'");
+    List<String> recordsInDatePartition2 = client.fetchAll();
+    client.execute("drop table " + tablepojo);
+    hiveStore.disconnect();
+
+    Assert.assertEquals(7, recordsInDatePartition1.size());
+    for (int i = 0; i < recordsInDatePartition1.size(); i++) {
+      LOG.debug("records in first date partition are {}", 
recordsInDatePartition1.get(i));
+      /*An array containing partition and data is returned as a string record, 
hence we need to upcast it to an object first
+       and then downcast to a string in order to use in Assert.*/
+      Object record = recordsInDatePartition1.get(i);
+      Object[] records = (Object[])record;
+      Assert.assertEquals(1, records[0]);
+      Assert.assertEquals("2014-12-11", records[1]);
+    }
+    Assert.assertEquals(7, recordsInDatePartition2.size());
+    for (int i = 0; i < recordsInDatePartition2.size(); i++) {
+      LOG.debug("records in second date partition are {}", 
recordsInDatePartition2.get(i));
+      Object record = recordsInDatePartition2.get(i);
+      Object[] records = (Object[])record;
+      Assert.assertEquals(2, records[0]);
+      Assert.assertEquals("2014-12-12", records[1]);
+    }
+  }
+
+  @Test
+  public void testHiveInsertMapOperator() throws SQLException, TException
+  {
+    HiveStore hiveStore = createStore(null);
+    hiveStore.setFilepath(testdir);
+    ArrayList<String> hivePartitionColumns = new ArrayList<String>();
+    hivePartitionColumns.add("dt");
+    hiveInitializeMapDatabase(createStore(null));
+    HiveOperator hiveOperator = new HiveOperator();
+    hiveOperator.setStore(hiveStore);
+    hiveOperator.setTablename(tablemap);
+    hiveOperator.setHivePartitionColumns(hivePartitionColumns);
+
+    FSRollingMapTestImpl fsRolling = new FSRollingMapTestImpl();
+    fsRolling.setFilePath(testdir);
+    short permission = 511;
+    fsRolling.setFilePermission(permission);
+    fsRolling.setAlwaysWriteToTmp(false);
+    fsRolling.setMaxLength(128);
+    AttributeMap.DefaultAttributeMap attributeMap = new 
AttributeMap.DefaultAttributeMap();
+    attributeMap.put(OperatorContext.PROCESSING_MODE, 
ProcessingMode.AT_LEAST_ONCE);
+    attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, -1L);
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(
+        OPERATOR_ID, attributeMap);
+
+    fsRolling.setup(context);
+    hiveOperator.setup(context);
+    HashMap<String, Object> map = new HashMap<String, Object>();
+    FilePartitionMapping mapping1 = new FilePartitionMapping();
+    FilePartitionMapping mapping2 = new FilePartitionMapping();
+    ArrayList<String> partitions1 = new ArrayList<String>();
+    partitions1.add("2014-12-10");
+    mapping1.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-10" + "/" 
+ "0-transaction.out.part.0");
+    mapping1.setPartition(partitions1);
+    ArrayList<String> partitions2 = new ArrayList<String>();
+    partitions2.add("2014-12-11");
+    mapping2.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-11" + "/" 
+ "0-transaction.out.part.0");
+    mapping2.setPartition(partitions2);
+    for (int wid = 0; wid < NUM_WINDOWS; wid++) {
+      fsRolling.beginWindow(wid);
+      for (int tupleCounter = 0; tupleCounter < BLAST_SIZE; tupleCounter++) {
+        map.put(2014 - 12 - 10 + "", 2014 - 12 - 10);
+        fsRolling.input.put(map);
+        map.clear();
+      }
+
+      if (wid == 7) {
+        fsRolling.committed(wid - 1);
+        hiveOperator.processTuple(mapping1);
+        hiveOperator.processTuple(mapping2);
+      }
+
+      fsRolling.endWindow();
+    }
+
+    fsRolling.teardown();
+
+    hiveStore.connect();
+
+    client.execute("select * from " + tablemap + " where dt='2014-12-10'");
+    List<String> recordsInDatePartition1 = client.fetchAll();
+
+    client.execute("drop table " + tablemap);
+    hiveStore.disconnect();
+
+    Assert.assertEquals(13, recordsInDatePartition1.size());
+    for (int i = 0; i < recordsInDatePartition1.size(); i++) {
+      LOG.debug("records in first date partition are {}", 
recordsInDatePartition1.get(i));
+      /*An array containing partition and data is returned as a string record, 
hence we need to upcast it to an object first
+       and then downcast to a string in order to use in Assert.*/
+      Object record = recordsInDatePartition1.get(i);
+      Object[] records = (Object[])record;
+      Assert.assertEquals("2014-12-10", records[1]);
+    }
+
+  }
+
+  @Test
+  public void testHDFSHiveCheckpoint() throws SQLException, TException
+  {
+    hiveInitializeDatabase(createStore(null));
+    HiveStore hiveStore = createStore(null);
+    hiveStore.setFilepath(testdir);
+    HiveOperator outputOperator = new HiveOperator();
+    HiveOperator newOp;
+    outputOperator.setStore(hiveStore);
+    ArrayList<String> hivePartitionColumns = new ArrayList<String>();
+    hivePartitionColumns.add("dt");
+    FSRollingTestImpl fsRolling = new FSRollingTestImpl();
+    hiveInitializeDatabase(createStore(null));
+    outputOperator.setHivePartitionColumns(hivePartitionColumns);
+    outputOperator.setTablename(tablename);
+    fsRolling.setFilePath(testdir);
+    short persmission = 511;
+    fsRolling.setFilePermission(persmission);
+    fsRolling.setAlwaysWriteToTmp(false);
+    fsRolling.setMaxLength(128);
+    AttributeMap.DefaultAttributeMap attributeMap = new 
AttributeMap.DefaultAttributeMap();
+    attributeMap.put(OperatorContext.PROCESSING_MODE, 
ProcessingMode.AT_LEAST_ONCE);
+    attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, -1L);
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(
+        OPERATOR_ID, attributeMap);
+
+    fsRolling.setup(context);
+
+    FilePartitionMapping mapping1 = new FilePartitionMapping();
+    FilePartitionMapping mapping2 = new FilePartitionMapping();
+    FilePartitionMapping mapping3 = new FilePartitionMapping();
+    outputOperator.setup(context);
+    mapping1.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-10" + "/" 
+ "0-transaction.out.part.0");
+    ArrayList<String> partitions1 = new ArrayList<String>();
+    partitions1.add("2014-12-10");
+    mapping1.setPartition(partitions1);
+    mapping2.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-11" + "/" 
+ "0-transaction.out.part.0");
+    ArrayList<String> partitions2 = new ArrayList<String>();
+    partitions2.add("2014-12-11");
+    mapping2.setPartition(partitions2);
+    ArrayList<String> partitions3 = new ArrayList<String>();
+    partitions3.add("2014-12-12");
+    mapping3.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-12" + "/" 
+ "0-transaction.out.part.0");
+    mapping3.setPartition(partitions3);
+    for (int wid = 0, total = 0; wid < NUM_WINDOWS; wid++) {
+      fsRolling.beginWindow(wid);
+      for (int tupleCounter = 0; tupleCounter < BLAST_SIZE && total < 
DATABASE_SIZE; tupleCounter++, total++) {
+        fsRolling.input.process("2014-12-1" + tupleCounter);
+      }
+      if (wid == 7) {
+        fsRolling.committed(wid - 1);
+        outputOperator.processTuple(mapping1);
+        outputOperator.processTuple(mapping2);
+      }
+
+      fsRolling.endWindow();
+
+      if (wid == 9) {
+        Kryo kryo = new Kryo();
+        FieldSerializer<HiveOperator> f1 = 
(FieldSerializer<HiveOperator>)kryo.getSerializer(HiveOperator.class);
+        FieldSerializer<HiveStore> f2 = 
(FieldSerializer<HiveStore>)kryo.getSerializer(HiveStore.class);
+
+        f1.setCopyTransient(false);
+        f2.setCopyTransient(false);
+        newOp = kryo.copy(outputOperator);
+        outputOperator.teardown();
+        newOp.setup(context);
+
+        newOp.beginWindow(7);
+        newOp.processTuple(mapping3);
+
+        newOp.endWindow();
+        newOp.teardown();
+        break;
+      }
+
+    }
+
+    hiveStore.connect();
+
+    client.execute("select * from " + tablename + " where dt='2014-12-10'");
+    List<String> recordsInDatePartition1 = client.fetchAll();
+
+    client.execute("select * from " + tablename + " where dt='2014-12-11'");
+    List<String> recordsInDatePartition2 = client.fetchAll();
+
+    client.execute("select * from " + tablename + " where dt='2014-12-12'");
+    List<String> recordsInDatePartition3 = client.fetchAll();
+
+    client.execute("drop table " + tablename);
+    hiveStore.disconnect();
+
+    Assert.assertEquals(7, recordsInDatePartition1.size());
+    for (int i = 0; i < recordsInDatePartition1.size(); i++) {
+      LOG.debug("records in first date partition are {}", 
recordsInDatePartition1.get(i));
+      /*An array containing partition and data is returned as a string record, 
hence we need to upcast it to an object first
+       and then downcast to a string in order to use in Assert.*/
+      Object record = recordsInDatePartition1.get(i);
+      Object[] records = (Object[])record;
+      Assert.assertEquals("2014-12-10", records[1]);
+    }
+    Assert.assertEquals(7, recordsInDatePartition2.size());
+    for (int i = 0; i < recordsInDatePartition2.size(); i++) {
+      LOG.debug("records in second date partition are {}", 
recordsInDatePartition2.get(i));
+      Object record = recordsInDatePartition2.get(i);
+      Object[] records = (Object[])record;
+      Assert.assertEquals("2014-12-11", records[1]);
+    }
+    Assert.assertEquals(10, recordsInDatePartition3.size());
+    for (int i = 0; i < recordsInDatePartition3.size(); i++) {
+      LOG.debug("records in second date partition are {}", 
recordsInDatePartition3.get(i));
+      Object record = recordsInDatePartition3.get(i);
+      Object[] records = (Object[])record;
+      Assert.assertEquals("2014-12-12", records[1]);
+    }
+
+  }
+
+  public class InnerObj
+  {
+    public InnerObj()
+    {
+    }
+
+    private int id;
+    private String date;
+
+    public String getDate()
+    {
+      return date;
+    }
+
+    public void setDate(String date)
+    {
+      this.date = date;
+    }
+
+    public int getId()
+    {
+      return id;
+    }
+
+    public void setId(int id)
+    {
+      this.id = id;
+    }
+
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveMockTest.class);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/src/test/java/com/datatorrent/contrib/hive/HiveStoreTest.java
----------------------------------------------------------------------
diff --git a/hive/src/test/java/com/datatorrent/contrib/hive/HiveStoreTest.java 
b/hive/src/test/java/com/datatorrent/contrib/hive/HiveStoreTest.java
new file mode 100755
index 0000000..4f32899
--- /dev/null
+++ b/hive/src/test/java/com/datatorrent/contrib/hive/HiveStoreTest.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.hive;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HiveStoreTest
+{
+  @Test
+  public void defaultDriverTest()
+  {
+    HiveStore hiveStore = new HiveStore();
+    Assert.assertEquals("Test default driver", HiveStore.HIVE_DRIVER, 
hiveStore.getDatabaseDriver());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 627168b..a8e50ad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -202,6 +202,7 @@
       <id>all-modules</id>
       <modules>
         <module>kafka</module>
+        <module>hive</module>
         <module>stream</module>
         <module>benchmark</module>
         <module>apps</module>

Reply via email to