Repository: incubator-apex-malhar Updated Branches: refs/heads/master ef12eb0cf -> 422f5d946
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java ---------------------------------------------------------------------- diff --git a/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java b/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java new file mode 100755 index 0000000..6360768 --- /dev/null +++ b/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java @@ -0,0 +1,279 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.hive; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.ExecutionException; + +import javax.validation.constraints.Min; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.mutable.MutableInt; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator.CheckpointListener; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * An implementation of FS Writer that writes text files to hdfs which are + * inserted into hive on committed window callback. HiveStreamCodec is used to + * make sure that data being sent to a particular hive partition goes to a + * specific operator partition by passing FSRollingOutputOperator to the stream + * codec. Also filename is determined uniquely for each tuple going to a + * specific hive partition. + * + * @since 2.1.0 + */ +public abstract class AbstractFSRollingOutputOperator<T> extends AbstractFileOutputOperator<T> + implements CheckpointListener +{ + private transient String outputFilePath; + protected MutableInt partNumber; + protected HashMap<Long, ArrayList<String>> mapFilenames = new HashMap<Long, ArrayList<String>>(); + protected HashMap<String, ArrayList<String>> mapPartition = new HashMap<String, ArrayList<String>>(); + protected Queue<Long> queueWindows = new LinkedList<Long>(); + protected long windowIDOfCompletedPart = Stateless.WINDOW_ID; + protected long committedWindowId = Stateless.WINDOW_ID; + private boolean isEmptyWindow; + private transient int operatorId; + private int countEmptyWindow; + private ArrayList<String> partition = new ArrayList<String>(); + + //This variable is user configurable. + @Min(0) + private long maxWindowsWithNoData = 100; + + /** + * The output port that will emit a POJO containing file which is committed + * and specific hive partitions in which this file should be loaded to + * HiveOperator. + */ + public final transient DefaultOutputPort<FilePartitionMapping> outputPort = new DefaultOutputPort<FilePartitionMapping>(); + + public AbstractFSRollingOutputOperator() + { + countEmptyWindow = 0; + HiveStreamCodec<T> hiveCodec = new HiveStreamCodec<T>(); + hiveCodec.rollingOperator = this; + streamCodec = hiveCodec; + } + + @Override + public void setup(OperatorContext context) + { + String appId = context.getValue(DAG.APPLICATION_ID); + operatorId = context.getId(); + outputFilePath = File.separator + appId + File.separator + operatorId; + super.setup(context); + } + + @Override + public void beginWindow(long windowId) + { + isEmptyWindow = true; + windowIDOfCompletedPart = windowId; + } + + /* + * MapFilenames has mapping from completed file to windowId in which it got completed. + * Also maintaining a queue of these windowIds which helps in reducing iteration time in committed callback. + */ + @Override + protected void rotateHook(String finishedFile) + { + isEmptyWindow = false; + if (mapFilenames.containsKey(windowIDOfCompletedPart)) { + mapFilenames.get(windowIDOfCompletedPart).add(finishedFile); + } else { + ArrayList<String> listFileNames = new ArrayList<String>(); + listFileNames.add(finishedFile); + mapFilenames.put(windowIDOfCompletedPart, listFileNames); + } + queueWindows.add(windowIDOfCompletedPart); + + } + + /* + * Filenames include operator Id and the specific hive partitions to which the file will be loaded. + * Partition is determined based on tuple and its implementation is left to user. + */ + @Override + protected String getFileName(T tuple) + { + isEmptyWindow = false; + partition = getHivePartition(tuple); + StringBuilder output = new StringBuilder(outputFilePath); + int numPartitions = partition.size(); + if (numPartitions > 0) { + for (int i = 0; i < numPartitions; i++) { + output.append(File.separator).append(partition.get(i)); + } + output.append(File.separator).append(operatorId).append("-transaction.out.part"); + String partFile = getPartFileNamePri(output.toString()); + mapPartition.put(partFile, partition); + } + return output.toString(); + } + + /* + * Moving completed files into hive on committed window callback. + * Criteria for moving them is that the windowId in which they are completed + * should be less than committed window. + */ + @Override + public void committed(long windowId) + { + committedWindowId = windowId; + Iterator<Long> iterWindows = queueWindows.iterator(); + ArrayList<String> list = new ArrayList<String>(); + while (iterWindows.hasNext()) { + windowId = iterWindows.next(); + if (committedWindowId >= windowId) { + logger.debug("list is {}", mapFilenames.get(windowId)); + list = mapFilenames.get(windowId); + FilePartitionMapping partMap = new FilePartitionMapping(); + if (list != null) { + for (int i = 0; i < list.size(); i++) { + partMap.setFilename(list.get(i)); + partMap.setPartition(mapPartition.get(list.get(i))); + outputPort.emit(partMap); + } + } + mapFilenames.remove(windowId); + iterWindows.remove(); + } + if (committedWindowId < windowId) { + break; + } + } + } + + @Override + public void checkpointed(long windowId) + { + } + + protected void rotateCall(String lastFile) + { + try { + this.rotate(lastFile); + } catch (IOException ex) { + logger.debug(ex.getMessage()); + DTThrowable.rethrow(ex); + } catch (ExecutionException ex) { + logger.debug(ex.getMessage()); + DTThrowable.rethrow(ex); + } + } + + public String getHDFSRollingLastFile() + { + Iterator<String> iterFileNames = this.openPart.keySet().iterator(); + String lastFile = null; + if (iterFileNames.hasNext()) { + lastFile = iterFileNames.next(); + partNumber = this.openPart.get(lastFile); + } + return getPartFileName(lastFile, partNumber.intValue()); + } + + /** + * This method gets a List of Hive Partitions in which the tuple needs to be + * written to. Example: If hive partitions are date='2014-12-12',country='USA' + * then this method returns {"2014-12-12","USA"} The implementation is left to + * the user. + * + * @param tuple + * A received tuple to be written to a hive partition. + * @return ArrayList containing hive partition values. + */ + public abstract ArrayList<String> getHivePartition(T tuple); + + @Override + public void endWindow() + { + if (isEmptyWindow) { + countEmptyWindow++; + } + if (countEmptyWindow >= maxWindowsWithNoData) { + String lastFile = getHDFSRollingLastFile(); + rotateCall(lastFile); + countEmptyWindow = 0; + } + super.endWindow(); + + } + + public long getMaxWindowsWithNoData() + { + return maxWindowsWithNoData; + } + + public void setMaxWindowsWithNoData(long maxWindowsWithNoData) + { + this.maxWindowsWithNoData = maxWindowsWithNoData; + } + + /* + * A POJO which is emitted by output port of AbstractFSRollingOutputOperator implementation in DAG. + * The POJO contains the filename which will not be changed by FSRollingOutputOperator once its emitted. + * The POJO also contains the hive partitions to which the respective files will be moved. + */ + public static class FilePartitionMapping + { + private String filename; + private ArrayList<String> partition = new ArrayList<String>(); + + public ArrayList<String> getPartition() + { + return partition; + } + + public void setPartition(ArrayList<String> partition) + { + this.partition = partition; + } + + public String getFilename() + { + return filename; + } + + public void setFilename(String filename) + { + this.filename = filename; + } + + } + + private static final Logger logger = LoggerFactory.getLogger(AbstractFSRollingOutputOperator.class); + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/src/main/java/com/datatorrent/contrib/hive/FSPojoToHiveOperator.java ---------------------------------------------------------------------- diff --git a/hive/src/main/java/com/datatorrent/contrib/hive/FSPojoToHiveOperator.java b/hive/src/main/java/com/datatorrent/contrib/hive/FSPojoToHiveOperator.java new file mode 100644 index 0000000..eaae68f --- /dev/null +++ b/hive/src/main/java/com/datatorrent/contrib/hive/FSPojoToHiveOperator.java @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.hive; + +import java.sql.Date; +import java.sql.Timestamp; +import java.util.ArrayList; + +import com.google.common.collect.Lists; + +import com.datatorrent.lib.util.PojoUtils; +import com.datatorrent.lib.util.PojoUtils.Getter; +import com.datatorrent.lib.util.PojoUtils.GetterBoolean; +import com.datatorrent.lib.util.PojoUtils.GetterChar; +import com.datatorrent.lib.util.PojoUtils.GetterDouble; +import com.datatorrent.lib.util.PojoUtils.GetterFloat; +import com.datatorrent.lib.util.PojoUtils.GetterInt; +import com.datatorrent.lib.util.PojoUtils.GetterLong; +import com.datatorrent.lib.util.PojoUtils.GetterShort; + +/** + * An Implementation of AbstractFSRollingOutputOperator which takes any POJO as + * input, serializes the POJO as Hive delimiter separated values which are + * written in text files to hdfs, and are inserted into hive on committed window + * callback.This operator can handle outputting to multiple files when the + * output file depends on the tuple. + * + * @displayName: FS To Hive Operator + * @category Output + * @tags fs, hive, database + * @since 3.0.0 + */ +public class FSPojoToHiveOperator extends AbstractFSRollingOutputOperator<Object> +{ + private static final long serialVersionUID = 1L; + private ArrayList<String> hivePartitionColumns; + private ArrayList<String> hiveColumns; + private ArrayList<FIELD_TYPE> hiveColumnDataTypes; + private ArrayList<FIELD_TYPE> hivePartitionColumnDataTypes; + private transient ArrayList<Object> getters = Lists.newArrayList(); + private ArrayList<String> expressionsForHiveColumns; + private ArrayList<String> expressionsForHivePartitionColumns; + + public ArrayList<String> getExpressionsForHivePartitionColumns() + { + return expressionsForHivePartitionColumns; + } + + public void setExpressionsForHivePartitionColumns(ArrayList<String> expressionsForHivePartitionColumns) + { + this.expressionsForHivePartitionColumns = expressionsForHivePartitionColumns; + } + + /* + * A list of Java expressions in which each expression yields the specific table column value and partition column value in hive table from the input POJO. + */ + public ArrayList<String> getExpressionsForHiveColumns() + { + return expressionsForHiveColumns; + } + + public void setExpressionsForHiveColumns(ArrayList<String> expressions) + { + this.expressionsForHiveColumns = expressions; + } + + @SuppressWarnings("unchecked") + private void getValue(Object tuple, int index, FIELD_TYPE type, StringBuilder value) + { + switch (type) { + case CHARACTER: + value.append(((GetterChar<Object>)getters.get(index)).get(tuple)); + break; + case STRING: + value.append(((Getter<Object, String>)getters.get(index)).get(tuple)); + break; + case BOOLEAN: + value.append(((GetterBoolean<Object>)getters.get(index)).get(tuple)); + break; + case SHORT: + value.append(((GetterShort<Object>)getters.get(index)).get(tuple)); + break; + case INTEGER: + value.append(((GetterInt<Object>)getters.get(index)).get(tuple)); + break; + case LONG: + value.append(((GetterLong<Object>)getters.get(index)).get(tuple)); + break; + case FLOAT: + value.append(((GetterFloat<Object>)getters.get(index)).get(tuple)); + break; + case DOUBLE: + value.append(((GetterDouble<Object>)getters.get(index)).get(tuple)); + break; + case DATE: + value.append(((Getter<Object, Date>)getters.get(index)).get(tuple)); + break; + case TIMESTAMP: + value.append(((Getter<Object, Timestamp>)getters.get(index)).get(tuple)); + break; + case OTHER: + value.append(((Getter<Object, Object>)getters.get(index)).get(tuple)); + break; + default: + throw new RuntimeException("unsupported data type " + type); + } + } + + public static enum FIELD_TYPE + { + BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE, TIMESTAMP, OTHER + } + + /* + * Columns in Hive table. + */ + public ArrayList<String> getHiveColumns() + { + return hiveColumns; + } + + public void setHiveColumns(ArrayList<String> hiveColumns) + { + this.hiveColumns = hiveColumns; + } + + /* + * Partition Columns in Hive table.Example: dt for date,ts for timestamp + */ + public ArrayList<String> getHivePartitionColumns() + { + return hivePartitionColumns; + } + + public void setHivePartitionColumns(ArrayList<String> hivePartitionColumns) + { + this.hivePartitionColumns = hivePartitionColumns; + } + + /* + * Data Types of Hive table data columns. + * Example: If the Hive table has two columns of data type int and float, + * then hiveColumnsDataTypes = {INTEGER,FLOAT}. + * Particular Data Type can be chosen from the List of data types provided. + */ + public ArrayList<FIELD_TYPE> getHiveColumnDataTypes() + { + return hiveColumnDataTypes; + } + + public void setHiveColumnDataTypes(ArrayList<FIELD_TYPE> hiveColumnDataTypes) + { + this.hiveColumnDataTypes = hiveColumnDataTypes; + } + + /* + * Data Types of Hive table Partition Columns. + * Example: If the Hive table has two columns of data type int and float and is partitioned by date of type String, + * then hivePartitionColumnDataTypes = {STRING}. + * Particular Data Type can be chosen from the List of data types provided. + */ + public ArrayList<FIELD_TYPE> getHivePartitionColumnDataTypes() + { + return hivePartitionColumnDataTypes; + } + + public void setHivePartitionColumnDataTypes(ArrayList<FIELD_TYPE> hivePartitionColumnDataTypes) + { + this.hivePartitionColumnDataTypes = hivePartitionColumnDataTypes; + } + + @Override + @SuppressWarnings("unchecked") + public ArrayList<String> getHivePartition(Object tuple) + { + if (getters.isEmpty()) { + processFirstTuple(tuple); + } + + int sizeOfColumns = hiveColumns.size(); + int sizeOfPartitionColumns = hivePartitionColumns.size(); + //int size = sizeOfColumns + sizeOfPartitionColumns; + ArrayList<String> hivePartitionColumnValues = new ArrayList<String>(); + String partitionColumnValue; + for (int i = 0; i < sizeOfPartitionColumns; i++) { + FIELD_TYPE type = hivePartitionColumnDataTypes.get(i); + StringBuilder result = new StringBuilder(); + getValue(tuple, sizeOfColumns + i, type, result); + partitionColumnValue = result.toString(); + //partitionColumnValue = (String)getters.get(i).get(tuple); + hivePartitionColumnValues.add(partitionColumnValue); + } + return hivePartitionColumnValues; + } + + @Override + public void processTuple(Object tuple) + { + if (getters.isEmpty()) { + processFirstTuple(tuple); + } + super.processTuple(tuple); + } + + @SuppressWarnings("unchecked") + public void processFirstTuple(Object tuple) + { + Class<?> fqcn = tuple.getClass(); + createGetters(fqcn, hiveColumns.size(), expressionsForHiveColumns, hiveColumnDataTypes); + createGetters(fqcn, hivePartitionColumns.size(), expressionsForHivePartitionColumns, hivePartitionColumnDataTypes); + } + + protected void createGetters(Class<?> fqcn, int size, ArrayList<String> expressions, + ArrayList<FIELD_TYPE> columnDataTypes) + { + for (int i = 0; i < size; i++) { + FIELD_TYPE type = columnDataTypes.get(i); + final Object getter; + final String getterExpression = expressions.get(i); + switch (type) { + case CHARACTER: + getter = PojoUtils.createGetterChar(fqcn, getterExpression); + break; + case STRING: + getter = PojoUtils.createGetter(fqcn, getterExpression, String.class); + break; + case BOOLEAN: + getter = PojoUtils.createGetterBoolean(fqcn, getterExpression); + break; + case SHORT: + getter = PojoUtils.createGetterShort(fqcn, getterExpression); + break; + case INTEGER: + getter = PojoUtils.createGetterInt(fqcn, getterExpression); + break; + case LONG: + getter = PojoUtils.createGetterLong(fqcn, getterExpression); + break; + case FLOAT: + getter = PojoUtils.createGetterFloat(fqcn, getterExpression); + break; + case DOUBLE: + getter = PojoUtils.createGetterDouble(fqcn, getterExpression); + break; + case DATE: + getter = PojoUtils.createGetter(fqcn, getterExpression, Date.class); + break; + case TIMESTAMP: + getter = PojoUtils.createGetter(fqcn, getterExpression, Timestamp.class); + break; + default: + getter = PojoUtils.createGetter(fqcn, getterExpression, Object.class); + } + + getters.add(getter); + + } + + } + + @Override + @SuppressWarnings("unchecked") + protected byte[] getBytesForTuple(Object tuple) + { + int size = hiveColumns.size(); + StringBuilder result = new StringBuilder(); + for (int i = 0; i < size; i++) { + FIELD_TYPE type = hiveColumnDataTypes.get(i); + getValue(tuple, i, type, result); + result.append("\t"); + } + result.append("\n"); + return (result.toString()).getBytes(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java ---------------------------------------------------------------------- diff --git a/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java b/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java new file mode 100755 index 0000000..8e3b143 --- /dev/null +++ b/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.hive; + +import java.io.IOException; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; + +import javax.annotation.Nonnull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang.mutable.MutableLong; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator.FilePartitionMapping; +import com.datatorrent.lib.counters.BasicCounters; +import com.datatorrent.lib.db.AbstractStoreOutputOperator; + +/** + * Hive operator which can insert data in txt format in tables/partitions from a + * file written in hdfs location. The file contains data of the same data type + * as the hive tables created by user and is already committed. No changes will + * be made to the input file once its given to HiveOperator. This is a fault + * tolerant implementation of HiveOperator which assumes that load operation is + * an atomic operation in Hive. + * + * @category Output + * @tags database, sql, hive + * + * @since 2.1.0 + */ +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public class HiveOperator extends AbstractStoreOutputOperator<FilePartitionMapping, HiveStore> +{ + //This Property is user configurable. + protected ArrayList<String> hivePartitionColumns = new ArrayList<String>(); + private transient String localString = ""; + + /** + * Hive store. + * + * @deprecated use {@link AbstractStoreOutputOperator#store} instead + */ + @Deprecated + protected HiveStore hivestore; + + /** + * The file system used to write to. + */ + protected transient FileSystem fs; + @Nonnull + protected String tablename; + @Nonnull + protected String hivepath; + /** + * This is the operator context passed at setup. + */ + private transient OperatorContext context; + + /** + * File output counters. + */ + private final BasicCounters<MutableLong> fileCounters = new BasicCounters<MutableLong>(MutableLong.class); + + /** + * The total time in milliseconds the operator has been running for. + */ + private long totalTime; + /** + * Last time stamp collected. + */ + private long lastTimeStamp; + /** + * The total number of bytes written by the operator. + */ + protected long totalBytesWritten = 0; + + public HiveOperator() + { + store = new HiveStore(); + hivestore = store; + } + + @Override + public void setup(OperatorContext context) + { + try { + fs = getHDFSInstance(); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + + this.context = context; + lastTimeStamp = System.currentTimeMillis(); + + fileCounters.setCounter(Counters.TOTAL_BYTES_WRITTEN, new MutableLong()); + fileCounters.setCounter(Counters.TOTAL_TIME_ELAPSED, new MutableLong()); + super.setup(context); + } + + /** + * Override this method to change the FileSystem instance that is used by the + * operator. + * + * @return A FileSystem object. + * @throws IOException + */ + protected FileSystem getHDFSInstance() throws IOException + { + FileSystem tempFS = FileSystem.newInstance(new Path(store.filepath).toUri(), new Configuration()); + if (!tempFS.getScheme().equalsIgnoreCase("hdfs")) { + localString = " local"; + } + return tempFS; + } + + /** + * Function to process each incoming tuple. The input is FilePartitionMapping + * which is a POJO containing filename which is already committed and will not + * be changed.The POJO also contains the hive partitions to which the + * respective files will be moved. + * + * @param tuple + * incoming tuple which has filename and hive partition. + */ + @Override + public void processTuple(FilePartitionMapping tuple) + { + String command = processHiveFile(tuple); + logger.debug("commands is {}", command); + if (command != null) { + Statement stmt; + try { + stmt = store.getConnection().createStatement(); + stmt.execute(command); + } catch (SQLException ex) { + throw new RuntimeException("Moving file into hive failed" + ex); + } + } + + } + + /* + * This function extracts the filename and partitions to which the file will be loaded. + * It returns the command to be executed by hive. + */ + private String processHiveFile(FilePartitionMapping tuple) + { + String filename = tuple.getFilename(); + ArrayList<String> partition = tuple.getPartition(); + String command = null; + String filepath = store.getFilepath() + Path.SEPARATOR + filename; + logger.debug("processing {} filepath", filepath); + int numPartitions = partition.size(); + try { + if (fs.exists(new Path(filepath))) { + if (numPartitions > 0) { + StringBuilder partitionString = new StringBuilder( + hivePartitionColumns.get(0) + "='" + partition.get(0) + "'"); + int i = 0; + while (i < numPartitions) { + i++; + if (i == numPartitions) { + break; + } + partitionString.append(",").append(hivePartitionColumns.get(i)).append("='").append(partition.get(i)) + .append("'"); + } + if (i < hivePartitionColumns.size()) { + partitionString.append(",").append(hivePartitionColumns.get(i)); + } + command = "load data" + localString + " inpath '" + filepath + "' into table " + tablename + " PARTITION" + + "( " + partitionString + " )"; + } else { + command = "load data" + localString + " inpath '" + filepath + "' into table " + tablename; + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + logger.debug("command is {}", command); + return command; + } + + @Override + public void endWindow() + { + long currentTimeStamp = System.currentTimeMillis(); + totalTime += currentTimeStamp - lastTimeStamp; + lastTimeStamp = currentTimeStamp; + + fileCounters.getCounter(Counters.TOTAL_TIME_ELAPSED).setValue(totalTime); + fileCounters.getCounter(Counters.TOTAL_BYTES_WRITTEN).setValue(totalBytesWritten); + context.setCounters(fileCounters); + } + + public void teardown() + { + long currentTimeStamp = System.currentTimeMillis(); + totalTime += currentTimeStamp - lastTimeStamp; + lastTimeStamp = currentTimeStamp; + } + + /** + * Get the partition columns in hive to which data needs to be loaded. + * + * @return List of Hive Partition Columns + */ + public ArrayList<String> getHivePartitionColumns() + { + return hivePartitionColumns; + } + + /** + * Set the hive partition columns to which data needs to be loaded. + * + * @param hivePartitionColumns + */ + public void setHivePartitionColumns(ArrayList<String> hivePartitionColumns) + { + this.hivePartitionColumns = hivePartitionColumns; + } + + /** + * Get the table name in hive. + * + * @return table name + */ + public String getTablename() + { + return tablename; + } + + /** + * Set the table name in hive. + * + * @param tablename + */ + public void setTablename(String tablename) + { + this.tablename = tablename; + } + + /** + * Gets the store set for hive; + * + * @deprecated use {@link #getStore()} instead. + * @return hive store + */ + @Deprecated + public HiveStore getHivestore() + { + return store; + } + + /** + * Set the store in hive. + * + * @deprecated use {@link #setStore()} instead. + * @param hivestore + */ + @Deprecated + public void setHivestore(HiveStore hivestore) + { + this.hivestore = hivestore; + super.setStore(hivestore); + } + + public static enum Counters + { + /** + * An enum for counters representing the total number of bytes written by + * the operator. + */ + TOTAL_BYTES_WRITTEN, + + /** + * An enum for counters representing the total time the operator has been + * operational for. + */ + TOTAL_TIME_ELAPSED + } + + private static final Logger logger = LoggerFactory.getLogger(HiveOperator.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/src/main/java/com/datatorrent/contrib/hive/HiveStore.java ---------------------------------------------------------------------- diff --git a/hive/src/main/java/com/datatorrent/contrib/hive/HiveStore.java b/hive/src/main/java/com/datatorrent/contrib/hive/HiveStore.java new file mode 100755 index 0000000..362a824 --- /dev/null +++ b/hive/src/main/java/com/datatorrent/contrib/hive/HiveStore.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.hive; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.lib.db.jdbc.JdbcStore; + +/** + * Hive Store that extends Jdbc Store and provides its own driver name. + * + * @since 2.1.0 + */ +public class HiveStore extends JdbcStore +{ + public HiveStore() + { + super(); + this.setDatabaseDriver(HIVE_DRIVER); + } + + public static final String HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver"; + @NotNull + public String filepath; + + public String getFilepath() + { + return filepath; + } + + public void setFilepath(String filepath) + { + this.filepath = filepath; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/src/main/java/com/datatorrent/contrib/hive/HiveStreamCodec.java ---------------------------------------------------------------------- diff --git a/hive/src/main/java/com/datatorrent/contrib/hive/HiveStreamCodec.java b/hive/src/main/java/com/datatorrent/contrib/hive/HiveStreamCodec.java new file mode 100755 index 0000000..becab2c --- /dev/null +++ b/hive/src/main/java/com/datatorrent/contrib/hive/HiveStreamCodec.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.hive; + +import java.io.ByteArrayOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; + +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; + +/** + * Stream codec for uniform distribution of tuples on upstream operator. This is + * used to make sure that data being sent to a particular hive partition goes to + * a specific operator partition by passing FSRollingOutputOperator to the + * stream codec. + * + * @since 2.1.0 + */ +public class HiveStreamCodec<T> extends KryoSerializableStreamCodec<T> implements Externalizable +{ + private static final long serialVersionUID = 201412121604L; + + protected AbstractFSRollingOutputOperator<T> rollingOperator; + + @Override + public void writeExternal(ObjectOutput out) throws IOException + { + + ByteArrayOutputStream os = new ByteArrayOutputStream(); + ObjectOutputStream obj = new ObjectOutputStream(os); + Output output = new Output(obj); + kryo.writeClassAndObject(output, rollingOperator); + byte[] outBytes = output.toBytes(); + out.writeInt(outBytes.length); + out.write(outBytes, 0, outBytes.length); + out.flush(); + } + + @Override + @SuppressWarnings("unchecked") + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException + { + int size = in.readInt(); + byte[] data = new byte[size]; + in.readFully(data); + Input input = new Input(data); + input.setBuffer(data); + rollingOperator = (AbstractFSRollingOutputOperator)kryo.readClassAndObject(input); + } + + @Override + public int getPartition(T o) + { + return rollingOperator.getHivePartition(o).hashCode(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/src/main/java/com/datatorrent/contrib/hive/package-info.java ---------------------------------------------------------------------- diff --git a/hive/src/main/java/com/datatorrent/contrib/hive/package-info.java b/hive/src/main/java/com/datatorrent/contrib/hive/package-info.java new file mode 100644 index 0000000..ad93d8f --- /dev/null +++ b/hive/src/main/java/com/datatorrent/contrib/hive/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ [email protected] +package com.datatorrent.contrib.hive; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java ---------------------------------------------------------------------- diff --git a/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java b/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java new file mode 100644 index 0000000..4f58cf4 --- /dev/null +++ b/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java @@ -0,0 +1,482 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.hive; + +import java.util.ArrayList; +import java.util.Arrays; + +import javax.annotation.Nonnull; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +import org.apache.commons.beanutils.ConvertUtils; +import org.apache.commons.beanutils.converters.AbstractConverter; +import org.apache.commons.beanutils.converters.ArrayConverter; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.Module; +import com.datatorrent.contrib.hive.FSPojoToHiveOperator; +import com.datatorrent.contrib.hive.FSPojoToHiveOperator.FIELD_TYPE; +import com.datatorrent.contrib.hive.HiveOperator; +import com.datatorrent.contrib.hive.HiveStore; + +/** + * HiveOutputModule provides abstraction for the operators needed for writing + * tuples to hive. This module will be expanded to FSPojoToHiveOperator and + * HiveOperator in physical plan. + */ [email protected] +public class HiveOutputModule implements Module +{ + + /** + * The path of the directory to where files are written. + */ + @NotNull + private String filePath; + + /** + * Names of the columns in hive table (excluding partitioning columns). + */ + private String[] hiveColumns; + + /** + * Data types of the columns in hive table (excluding partitioning columns). + * This sequence should match to the fields in hiveColumnDataTypes + */ + private FIELD_TYPE[] hiveColumnDataTypes; + + /** + * Expressions for the hive columns (excluding partitioning columns). This + * sequence should match to the fields in hiveColumnDataTypes + */ + private String[] expressionsForHiveColumns; + + /** + * Names of the columns on which hive data should be partitioned + */ + private String[] hivePartitionColumns; + + /** + * Data types of the columns on which hive data should be partitioned. This + * sequence should match to the fields in hivePartitionColumns + */ + private FIELD_TYPE[] hivePartitionColumnDataTypes; + + /** + * Expressions for the hive partition columns. This sequence should match to + * the fields in hivePartitionColumns + */ + private String[] expressionsForHivePartitionColumns; + + /** + * The maximum length in bytes of a rolling file. Default value is 128MB. + */ + @Min(1) + protected Long maxLength = 134217728L; + + /** + * Connection URL for connecting to hive. + */ + @NotNull + private String databaseUrl; + + /** + * Driver for connecting to hive. + */ + @NotNull + private String databaseDriver; + + /** + * Username for connecting to hive + */ + private String userName; + + /** + * Password for connecting to hive + */ + private String password; + + /** + * Table name for writing data into hive + */ + @Nonnull + protected String tablename; + + /** + * Input port for files metadata. + */ + public final transient ProxyInputPort<Object> input = new ProxyInputPort<Object>(); + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + FSPojoToHiveOperator fsRolling = dag.addOperator("fsRolling", new FSPojoToHiveOperator()); + HiveOperator hiveOperator = dag.addOperator("HiveOperator", new HiveOperator()); + + input.set(fsRolling.input); + dag.addStream("toHive", fsRolling.outputPort, hiveOperator.input); + fsRolling.setFilePath(filePath); + fsRolling.setHiveColumns(new ArrayList<String>(Arrays.asList(hiveColumns))); + fsRolling.setHiveColumnDataTypes(new ArrayList<FIELD_TYPE>(Arrays.asList(hiveColumnDataTypes))); + fsRolling.setExpressionsForHiveColumns(new ArrayList<String>(Arrays.asList(expressionsForHiveColumns))); + + fsRolling.setHivePartitionColumns(new ArrayList<String>(Arrays.asList(hivePartitionColumns))); + fsRolling.setHivePartitionColumnDataTypes(new ArrayList<FIELD_TYPE>(Arrays.asList(hivePartitionColumnDataTypes))); + fsRolling.setExpressionsForHivePartitionColumns( + new ArrayList<String>(Arrays.asList(expressionsForHivePartitionColumns))); + + fsRolling.setMaxLength(maxLength); + fsRolling.setAlwaysWriteToTmp(true); + fsRolling.setRotationWindows(0); + + hiveOperator.setHivePartitionColumns(new ArrayList<String>(Arrays.asList(hivePartitionColumns))); + HiveStore hiveStore = hiveOperator.getStore(); + hiveStore.setFilepath(filePath); + hiveStore.setDatabaseUrl(databaseUrl); + hiveStore.setDatabaseDriver(databaseDriver); + hiveStore.getConnectionProperties().put("user", userName); + if (password != null) { + hiveStore.getConnectionProperties().put("password", password); + } + hiveOperator.setTablename(tablename); + } + + /** + * The path of the directory to where files are written. + * + * @return file path + */ + public String getFilePath() + { + return filePath; + } + + /** + * The path of the directory to where files are written. + * + * @param filePath + * file path + */ + public void setFilePath(String filePath) + { + this.filePath = filePath; + } + + /** + * Names of the columns in hive table (excluding partitioning columns). + * + * @return Hive column names + */ + public String[] getHiveColumns() + { + return hiveColumns; + } + + /** + * Names of the columns in hive table (excluding partitioning columns). + * + * @param hiveColumns + * Hive column names + */ + public void setHiveColumns(String[] hiveColumns) + { + this.hiveColumns = hiveColumns; + } + + /** + * Data types of the columns in hive table (excluding partitioning columns). + * This sequence should match to the fields in hiveColumnDataTypes + * + * @return Hive column data types + */ + public FIELD_TYPE[] getHiveColumnDataTypes() + { + return hiveColumnDataTypes; + } + + /** + * Data types of the columns in hive table (excluding partitioning columns). + * This sequence should match to the fields in hiveColumnDataTypes * + * + * @param hiveColumnDataTypes + * Hive column data types + */ + public void setHiveColumnDataTypes(FIELD_TYPE[] hiveColumnDataTypes) + { + this.hiveColumnDataTypes = hiveColumnDataTypes; + } + + /** + * Expressions for the hive columns (excluding partitioning columns). This + * sequence should match to the fields in hiveColumnDataTypes + * + * @return + */ + public String[] getExpressionsForHiveColumns() + { + return expressionsForHiveColumns; + } + + /** + * Expressions for the hive columns (excluding partitioning columns). This + * sequence should match to the fields in hiveColumnDataTypes + * + * @param expressionsForHiveColumns + */ + public void setExpressionsForHiveColumns(String[] expressionsForHiveColumns) + { + this.expressionsForHiveColumns = expressionsForHiveColumns; + } + + /** + * Names of the columns on which hive data should be partitioned + * + * @return hive partition columns + */ + public String[] getHivePartitionColumns() + { + return hivePartitionColumns; + } + + /** + * Names of the columns on which hive data should be partitioned + * + * @param hivePartitionColumns + * Hive partition columns + */ + public void setHivePartitionColumns(String[] hivePartitionColumns) + { + this.hivePartitionColumns = hivePartitionColumns; + } + + /** + * Data types of the columns on which hive data should be partitioned. This + * sequence should match to the fields in hivePartitionColumns + * + * @return Hive partition column data types + */ + public FIELD_TYPE[] getHivePartitionColumnDataTypes() + { + return hivePartitionColumnDataTypes; + } + + /** + * Data types of the columns on which hive data should be partitioned. This + * sequence should match to the fields in hivePartitionColumns + * + * @param hivePartitionColumnDataTypes + * Hive partition column data types + */ + public void setHivePartitionColumnDataTypes(FIELD_TYPE[] hivePartitionColumnDataTypes) + { + this.hivePartitionColumnDataTypes = hivePartitionColumnDataTypes; + } + + /** + * Expressions for the hive partition columns. This sequence should match to + * the fields in hivePartitionColumns + * + * @return Expressions for hive partition columns + */ + public String[] getExpressionsForHivePartitionColumns() + { + return expressionsForHivePartitionColumns; + } + + /** + * Expressions for the hive partition columns. This sequence should match to + * the fields in hivePartitionColumns + * + * @param expressionsForHivePartitionColumns + * Expressions for hive partition columns + */ + public void setExpressionsForHivePartitionColumns(String[] expressionsForHivePartitionColumns) + { + this.expressionsForHivePartitionColumns = expressionsForHivePartitionColumns; + } + + /** + * The maximum length in bytes of a rolling file. + * + * @return maximum size of file + */ + public Long getMaxLength() + { + return maxLength; + } + + /** + * The maximum length in bytes of a rolling file. + * + * @param maxLength + * maximum size of file + */ + public void setMaxLength(Long maxLength) + { + this.maxLength = maxLength; + } + + /** + * Connection URL for connecting to hive. + * + * @return database url + */ + public String getDatabaseUrl() + { + return databaseUrl; + } + + /** + * Connection URL for connecting to hive. + * + * @param databaseUrl + * database url + */ + public void setDatabaseUrl(String databaseUrl) + { + this.databaseUrl = databaseUrl; + } + + /** + * Driver for connecting to hive. + * + * @return database driver + */ + public String getDatabaseDriver() + { + return databaseDriver; + } + + /** + * Driver for connecting to hive. + * + * @param databaseDriver + * database driver + */ + public void setDatabaseDriver(String databaseDriver) + { + this.databaseDriver = databaseDriver; + } + + /** + * Username for connecting to hive + * + * @return user name + */ + public String getUserName() + { + return userName; + } + + /** + * Username for connecting to hive + * + * @param username + * user name + */ + public void setUserName(String userName) + { + this.userName = userName; + } + + /** + * Password for connecting to hive + * + * @return password + */ + public String getPassword() + { + return password; + } + + /** + * Password for connecting to hive + * + * @param password + * password + */ + public void setPassword(String password) + { + this.password = password; + } + + /** + * Table name for writing data into hive + * + * @return table name + */ + public String getTablename() + { + return tablename; + } + + /** + * Table name for writing data into hive + * + * @param tablename + * table name + */ + public void setTablename(String tablename) + { + this.tablename = tablename; + } + + static { + //Code for enabling BeanUtils to accept comma separated string to initialize FIELD_TYPE[] + class FieldTypeConvertor extends AbstractConverter + { + + @Override + protected Object convertToType(Class type, Object value) throws Throwable + { + if (value instanceof String) { + + return FIELD_TYPE.valueOf((String)value); + } else { + throw new IllegalArgumentException("FIELD_TYPE should be specified as String"); + } + } + + @Override + protected Class getDefaultType() + { + return FIELD_TYPE.class; + } + } + + class FieldTypeArrayConvertor extends ArrayConverter + { + + public FieldTypeArrayConvertor() + { + super(FIELD_TYPE[].class, new FieldTypeConvertor()); + } + } + + ConvertUtils.register(new FieldTypeConvertor(), FIELD_TYPE.class); + ConvertUtils.register(new FieldTypeConvertor(), FIELD_TYPE.class); + ConvertUtils.register(new FieldTypeArrayConvertor(), FIELD_TYPE[].class); + + ConvertUtils.lookup(FIELD_TYPE.class).getClass(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/src/test/java/com/datatorrent/contrib/hive/FSRollingMapTestImpl.java ---------------------------------------------------------------------- diff --git a/hive/src/test/java/com/datatorrent/contrib/hive/FSRollingMapTestImpl.java b/hive/src/test/java/com/datatorrent/contrib/hive/FSRollingMapTestImpl.java new file mode 100755 index 0000000..27b727f --- /dev/null +++ b/hive/src/test/java/com/datatorrent/contrib/hive/FSRollingMapTestImpl.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.hive; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; + +public class FSRollingMapTestImpl extends AbstractFSRollingOutputOperator<Map<String, Object>> +{ + @Override + public ArrayList<String> getHivePartition(Map<String, Object> tuple) + { + ArrayList<String> hivePartitions = new ArrayList<String>(); + hivePartitions.add("2014-12-10"); + return (hivePartitions); + } + + @Override + protected byte[] getBytesForTuple(Map<String, Object> tuple) + { + Iterator<String> keyIter = tuple.keySet().iterator(); + StringBuilder writeToHive = new StringBuilder(""); + + while (keyIter.hasNext()) { + String key = keyIter.next(); + Object obj = tuple.get(key); + writeToHive.append(key).append(":").append(obj).append("\n"); + } + return writeToHive.toString().getBytes(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/src/test/java/com/datatorrent/contrib/hive/FSRollingTestImpl.java ---------------------------------------------------------------------- diff --git a/hive/src/test/java/com/datatorrent/contrib/hive/FSRollingTestImpl.java b/hive/src/test/java/com/datatorrent/contrib/hive/FSRollingTestImpl.java new file mode 100755 index 0000000..f3bade6 --- /dev/null +++ b/hive/src/test/java/com/datatorrent/contrib/hive/FSRollingTestImpl.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.hive; + +import java.util.ArrayList; + +public class FSRollingTestImpl extends AbstractFSRollingOutputOperator<String> +{ + @Override + public ArrayList<String> getHivePartition(String tuple) + { + ArrayList<String> hivePartitions = new ArrayList<String>(); + hivePartitions.add(tuple); + return (hivePartitions); + } + + @Override + protected byte[] getBytesForTuple(String tuple) + { + return (tuple + "\n").getBytes(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java ---------------------------------------------------------------------- diff --git a/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java b/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java new file mode 100755 index 0000000..3c64bdf --- /dev/null +++ b/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java @@ -0,0 +1,618 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.hive; + +import java.io.File; +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.thrift.TException; + +import io.teknek.hiveunit.HiveTestService; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.serializers.FieldSerializer; + +import com.datatorrent.api.Attribute.AttributeMap; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator.ProcessingMode; +import com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator.FilePartitionMapping; +import com.datatorrent.contrib.hive.FSPojoToHiveOperator.FIELD_TYPE; +import com.datatorrent.lib.helper.OperatorContextTestHelper; + + +public class HiveMockTest extends HiveTestService +{ + public static final String APP_ID = "HiveOperatorTest"; + + public static final int OPERATOR_ID = 0; + public static final int NUM_WINDOWS = 10; + public static final int BLAST_SIZE = 10; + public static final int DATABASE_SIZE = NUM_WINDOWS * BLAST_SIZE; + public static final String tablename = "temp"; + public static final String tablepojo = "temppojo"; + public static final String tablemap = "tempmap"; + public static String delimiterMap = ":"; + public static final String HOST = "localhost"; + public static final String PORT = "10000"; + public static final String DATABASE = "default"; + public static final String HOST_PREFIX = "jdbc:hive://"; + + public String testdir; + + private String getDir() + { + String filePath = new File("target/hive").getAbsolutePath(); + LOG.info("filepath is {}", filePath); + return filePath; + } + + @Override + public void setUp() throws Exception + { + super.setUp(); + } + + private static final int NUMBER_OF_TESTS = 4; // count your tests here + private int sTestsRun = 0; + + @Override + public void tearDown() throws Exception + { + super.tearDown(); + sTestsRun++; + //Equivalent of @AfterClass in junit 3. + if (sTestsRun == NUMBER_OF_TESTS) { + FileUtils.deleteQuietly(new File(testdir)); + } + } + + public HiveMockTest() throws IOException, Exception + { + super(); + testdir = getDir(); + Properties properties = System.getProperties(); + properties.put("derby.system.home", testdir); + System.setProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.toString(), testdir); + new File(testdir).mkdir(); + } + + public static HiveStore createStore(HiveStore hiveStore) + { + String host = HOST; + String port = PORT; + + if (hiveStore == null) { + hiveStore = new HiveStore(); + } + + StringBuilder sb = new StringBuilder(); + String tempHost = HOST_PREFIX + host + ":" + port; + + LOG.debug("Host name: {}", tempHost); + LOG.debug("Port: {}", 10000); + + sb.append("user:").append("").append(","); + sb.append("password:").append(""); + + String properties = sb.toString(); + LOG.debug(properties); + hiveStore.setDatabaseDriver("org.apache.hive.jdbc.HiveDriver"); + + hiveStore.setDatabaseUrl("jdbc:hive2://"); + hiveStore.setConnectionProperties(properties); + return hiveStore; + } + + public static void hiveInitializeDatabase(HiveStore hiveStore) throws SQLException + { + hiveStore.connect(); + Statement stmt = hiveStore.getConnection().createStatement(); + // show tables + String sql = "show tables"; + + LOG.debug(sql); + ResultSet res = stmt.executeQuery(sql); + if (res.next()) { + LOG.debug("tables are {}", res.getString(1)); + } + + stmt.execute("DROP TABLE " + tablename); + + stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename + " (col1 String) PARTITIONED BY(dt STRING) " + + "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' \n" + "STORED AS TEXTFILE "); + /*ResultSet res = stmt.execute("CREATE TABLE IF NOT EXISTS temp4 (col1 map<string,int>,col2 map<string,int>,col3 map<string,int>,col4 map<String,timestamp>, col5 map<string,double>,col6 map<string,double>,col7 map<string,int>,col8 map<string,int>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' \n" + + "COLLECTION ITEMS TERMINATED BY '\n' \n" + + "MAP KEYS TERMINATED BY ':' \n" + + "LINES TERMINATED BY '\n' " + + "STORED AS TEXTFILE");*/ + + hiveStore.disconnect(); + } + + public static void hiveInitializePOJODatabase(HiveStore hiveStore) throws SQLException + { + hiveStore.connect(); + Statement stmt = hiveStore.getConnection().createStatement(); + // show tables + String sql = "show tables"; + + LOG.debug(sql); + ResultSet res = stmt.executeQuery(sql); + if (res.next()) { + LOG.debug("tables are {}", res.getString(1)); + } + stmt.execute("DROP TABLE " + tablepojo); + + stmt.execute("CREATE TABLE " + tablepojo + " (col1 int) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED " + + "FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' \n" + "STORED AS TEXTFILE "); + hiveStore.disconnect(); + } + + public static void hiveInitializeMapDatabase(HiveStore hiveStore) throws SQLException + { + hiveStore.connect(); + Statement stmt = hiveStore.getConnection().createStatement(); + // show tables + String sql = "show tables"; + + LOG.debug(sql); + ResultSet res = stmt.executeQuery(sql); + if (res.next()) { + LOG.debug(res.getString(1)); + } + + stmt.execute("DROP TABLE " + tablemap); + + stmt.execute("CREATE TABLE IF NOT EXISTS " + tablemap + + " (col1 map<string,int>) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' \n" + + "MAP KEYS TERMINATED BY '" + delimiterMap + "' \n" + "STORED AS TEXTFILE "); + hiveStore.disconnect(); + } + + @Test + public void testInsertString() throws Exception + { + HiveStore hiveStore = createStore(null); + hiveStore.setFilepath(testdir); + ArrayList<String> hivePartitionColumns = new ArrayList<String>(); + hivePartitionColumns.add("dt"); + hiveInitializeDatabase(createStore(null)); + HiveOperator hiveOperator = new HiveOperator(); + hiveOperator.setStore(hiveStore); + hiveOperator.setTablename(tablename); + hiveOperator.setHivePartitionColumns(hivePartitionColumns); + + FSRollingTestImpl fsRolling = new FSRollingTestImpl(); + fsRolling.setFilePath(testdir); + short permission = 511; + fsRolling.setFilePermission(permission); + fsRolling.setAlwaysWriteToTmp(false); + fsRolling.setMaxLength(128); + AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap(); + attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_LEAST_ONCE); + attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, -1L); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( + OPERATOR_ID, attributeMap); + + fsRolling.setup(context); + hiveOperator.setup(context); + FilePartitionMapping mapping1 = new FilePartitionMapping(); + FilePartitionMapping mapping2 = new FilePartitionMapping(); + mapping1.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-10" + "/" + "0-transaction.out.part.0"); + ArrayList<String> partitions1 = new ArrayList<String>(); + partitions1.add("2014-12-10"); + mapping1.setPartition(partitions1); + ArrayList<String> partitions2 = new ArrayList<String>(); + partitions2.add("2014-12-11"); + mapping2.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-11" + "/" + "0-transaction.out.part.0"); + mapping2.setPartition(partitions2); + for (int wid = 0, total = 0; wid < NUM_WINDOWS; wid++) { + fsRolling.beginWindow(wid); + for (int tupleCounter = 0; tupleCounter < BLAST_SIZE && total < DATABASE_SIZE; tupleCounter++, total++) { + fsRolling.input.process("2014-12-1" + tupleCounter); + } + if (wid == 7) { + fsRolling.committed(wid - 1); + hiveOperator.processTuple(mapping1); + hiveOperator.processTuple(mapping2); + } + + fsRolling.endWindow(); + } + + fsRolling.teardown(); + hiveStore.connect(); + client.execute("select * from " + tablename + " where dt='2014-12-10'"); + List<String> recordsInDatePartition1 = client.fetchAll(); + + client.execute("select * from " + tablename + " where dt='2014-12-11'"); + List<String> recordsInDatePartition2 = client.fetchAll(); + client.execute("drop table " + tablename); + hiveStore.disconnect(); + + Assert.assertEquals(7, recordsInDatePartition1.size()); + for (int i = 0; i < recordsInDatePartition1.size(); i++) { + LOG.debug("records in first date partition are {}", recordsInDatePartition1.get(i)); + /*An array containing partition and data is returned as a string record, hence we need to upcast it to an object first + and then downcast to a string in order to use in Assert.*/ + Object record = recordsInDatePartition1.get(i); + Object[] records = (Object[])record; + Assert.assertEquals("2014-12-10", records[1]); + } + Assert.assertEquals(7, recordsInDatePartition2.size()); + for (int i = 0; i < recordsInDatePartition2.size(); i++) { + LOG.debug("records in second date partition are {}", recordsInDatePartition2.get(i)); + Object record = recordsInDatePartition2.get(i); + Object[] records = (Object[])record; + Assert.assertEquals("2014-12-11", records[1]); + } + } + + @Test + public void testInsertPOJO() throws Exception + { + HiveStore hiveStore = createStore(null); + hiveStore.setFilepath(testdir); + ArrayList<String> hivePartitionColumns = new ArrayList<String>(); + hivePartitionColumns.add("dt"); + ArrayList<String> hiveColumns = new ArrayList<String>(); + hiveColumns.add("col1"); + hiveInitializePOJODatabase(createStore(null)); + HiveOperator hiveOperator = new HiveOperator(); + hiveOperator.setStore(hiveStore); + hiveOperator.setTablename(tablepojo); + hiveOperator.setHivePartitionColumns(hivePartitionColumns); + + FSPojoToHiveOperator fsRolling = new FSPojoToHiveOperator(); + fsRolling.setFilePath(testdir); + fsRolling.setHiveColumns(hiveColumns); + ArrayList<FIELD_TYPE> fieldtypes = new ArrayList<FIELD_TYPE>(); + ArrayList<FIELD_TYPE> partitiontypes = new ArrayList<FIELD_TYPE>(); + fieldtypes.add(FIELD_TYPE.INTEGER); + partitiontypes.add(FIELD_TYPE.STRING); + fsRolling.setHiveColumnDataTypes(fieldtypes); + fsRolling.setHivePartitionColumnDataTypes(partitiontypes); + //ArrayList<FIELD_TYPE> partitionColumnType = new ArrayList<FIELD_TYPE>(); + //partitionColumnType.add(FIELD_TYPE.STRING); + fsRolling.setHivePartitionColumns(hivePartitionColumns); + // fsRolling.setHivePartitionColumnsDataTypes(partitionColumnType); + ArrayList<String> expressions = new ArrayList<String>(); + expressions.add("getId()"); + ArrayList<String> expressionsPartitions = new ArrayList<String>(); + + expressionsPartitions.add("getDate()"); + short permission = 511; + fsRolling.setFilePermission(permission); + fsRolling.setAlwaysWriteToTmp(false); + fsRolling.setMaxLength(128); + fsRolling.setExpressionsForHiveColumns(expressions); + fsRolling.setExpressionsForHivePartitionColumns(expressionsPartitions); + AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap(); + attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_LEAST_ONCE); + attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, -1L); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( + OPERATOR_ID, attributeMap); + + fsRolling.setup(context); + hiveOperator.setup(context); + FilePartitionMapping mapping1 = new FilePartitionMapping(); + FilePartitionMapping mapping2 = new FilePartitionMapping(); + mapping1.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-11" + "/" + "0-transaction.out.part.0"); + ArrayList<String> partitions1 = new ArrayList<String>(); + partitions1.add("2014-12-11"); + mapping1.setPartition(partitions1); + ArrayList<String> partitions2 = new ArrayList<String>(); + partitions2.add("2014-12-12"); + mapping2.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-12" + "/" + "0-transaction.out.part.0"); + mapping2.setPartition(partitions2); + for (int wid = 0, total = 0; wid < NUM_WINDOWS; wid++) { + fsRolling.beginWindow(wid); + for (int tupleCounter = 1; tupleCounter < BLAST_SIZE && total < DATABASE_SIZE; tupleCounter++, total++) { + InnerObj innerObj = new InnerObj(); + innerObj.setId(tupleCounter); + innerObj.setDate("2014-12-1" + tupleCounter); + fsRolling.input.process(innerObj); + } + if (wid == 7) { + fsRolling.committed(wid - 1); + hiveOperator.processTuple(mapping1); + hiveOperator.processTuple(mapping2); + } + + fsRolling.endWindow(); + } + + fsRolling.teardown(); + hiveStore.connect(); + client.execute("select * from " + tablepojo + " where dt='2014-12-11'"); + List<String> recordsInDatePartition1 = client.fetchAll(); + + client.execute("select * from " + tablepojo + " where dt='2014-12-12'"); + List<String> recordsInDatePartition2 = client.fetchAll(); + client.execute("drop table " + tablepojo); + hiveStore.disconnect(); + + Assert.assertEquals(7, recordsInDatePartition1.size()); + for (int i = 0; i < recordsInDatePartition1.size(); i++) { + LOG.debug("records in first date partition are {}", recordsInDatePartition1.get(i)); + /*An array containing partition and data is returned as a string record, hence we need to upcast it to an object first + and then downcast to a string in order to use in Assert.*/ + Object record = recordsInDatePartition1.get(i); + Object[] records = (Object[])record; + Assert.assertEquals(1, records[0]); + Assert.assertEquals("2014-12-11", records[1]); + } + Assert.assertEquals(7, recordsInDatePartition2.size()); + for (int i = 0; i < recordsInDatePartition2.size(); i++) { + LOG.debug("records in second date partition are {}", recordsInDatePartition2.get(i)); + Object record = recordsInDatePartition2.get(i); + Object[] records = (Object[])record; + Assert.assertEquals(2, records[0]); + Assert.assertEquals("2014-12-12", records[1]); + } + } + + @Test + public void testHiveInsertMapOperator() throws SQLException, TException + { + HiveStore hiveStore = createStore(null); + hiveStore.setFilepath(testdir); + ArrayList<String> hivePartitionColumns = new ArrayList<String>(); + hivePartitionColumns.add("dt"); + hiveInitializeMapDatabase(createStore(null)); + HiveOperator hiveOperator = new HiveOperator(); + hiveOperator.setStore(hiveStore); + hiveOperator.setTablename(tablemap); + hiveOperator.setHivePartitionColumns(hivePartitionColumns); + + FSRollingMapTestImpl fsRolling = new FSRollingMapTestImpl(); + fsRolling.setFilePath(testdir); + short permission = 511; + fsRolling.setFilePermission(permission); + fsRolling.setAlwaysWriteToTmp(false); + fsRolling.setMaxLength(128); + AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap(); + attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_LEAST_ONCE); + attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, -1L); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( + OPERATOR_ID, attributeMap); + + fsRolling.setup(context); + hiveOperator.setup(context); + HashMap<String, Object> map = new HashMap<String, Object>(); + FilePartitionMapping mapping1 = new FilePartitionMapping(); + FilePartitionMapping mapping2 = new FilePartitionMapping(); + ArrayList<String> partitions1 = new ArrayList<String>(); + partitions1.add("2014-12-10"); + mapping1.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-10" + "/" + "0-transaction.out.part.0"); + mapping1.setPartition(partitions1); + ArrayList<String> partitions2 = new ArrayList<String>(); + partitions2.add("2014-12-11"); + mapping2.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-11" + "/" + "0-transaction.out.part.0"); + mapping2.setPartition(partitions2); + for (int wid = 0; wid < NUM_WINDOWS; wid++) { + fsRolling.beginWindow(wid); + for (int tupleCounter = 0; tupleCounter < BLAST_SIZE; tupleCounter++) { + map.put(2014 - 12 - 10 + "", 2014 - 12 - 10); + fsRolling.input.put(map); + map.clear(); + } + + if (wid == 7) { + fsRolling.committed(wid - 1); + hiveOperator.processTuple(mapping1); + hiveOperator.processTuple(mapping2); + } + + fsRolling.endWindow(); + } + + fsRolling.teardown(); + + hiveStore.connect(); + + client.execute("select * from " + tablemap + " where dt='2014-12-10'"); + List<String> recordsInDatePartition1 = client.fetchAll(); + + client.execute("drop table " + tablemap); + hiveStore.disconnect(); + + Assert.assertEquals(13, recordsInDatePartition1.size()); + for (int i = 0; i < recordsInDatePartition1.size(); i++) { + LOG.debug("records in first date partition are {}", recordsInDatePartition1.get(i)); + /*An array containing partition and data is returned as a string record, hence we need to upcast it to an object first + and then downcast to a string in order to use in Assert.*/ + Object record = recordsInDatePartition1.get(i); + Object[] records = (Object[])record; + Assert.assertEquals("2014-12-10", records[1]); + } + + } + + @Test + public void testHDFSHiveCheckpoint() throws SQLException, TException + { + hiveInitializeDatabase(createStore(null)); + HiveStore hiveStore = createStore(null); + hiveStore.setFilepath(testdir); + HiveOperator outputOperator = new HiveOperator(); + HiveOperator newOp; + outputOperator.setStore(hiveStore); + ArrayList<String> hivePartitionColumns = new ArrayList<String>(); + hivePartitionColumns.add("dt"); + FSRollingTestImpl fsRolling = new FSRollingTestImpl(); + hiveInitializeDatabase(createStore(null)); + outputOperator.setHivePartitionColumns(hivePartitionColumns); + outputOperator.setTablename(tablename); + fsRolling.setFilePath(testdir); + short persmission = 511; + fsRolling.setFilePermission(persmission); + fsRolling.setAlwaysWriteToTmp(false); + fsRolling.setMaxLength(128); + AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap(); + attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_LEAST_ONCE); + attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, -1L); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( + OPERATOR_ID, attributeMap); + + fsRolling.setup(context); + + FilePartitionMapping mapping1 = new FilePartitionMapping(); + FilePartitionMapping mapping2 = new FilePartitionMapping(); + FilePartitionMapping mapping3 = new FilePartitionMapping(); + outputOperator.setup(context); + mapping1.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-10" + "/" + "0-transaction.out.part.0"); + ArrayList<String> partitions1 = new ArrayList<String>(); + partitions1.add("2014-12-10"); + mapping1.setPartition(partitions1); + mapping2.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-11" + "/" + "0-transaction.out.part.0"); + ArrayList<String> partitions2 = new ArrayList<String>(); + partitions2.add("2014-12-11"); + mapping2.setPartition(partitions2); + ArrayList<String> partitions3 = new ArrayList<String>(); + partitions3.add("2014-12-12"); + mapping3.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-12" + "/" + "0-transaction.out.part.0"); + mapping3.setPartition(partitions3); + for (int wid = 0, total = 0; wid < NUM_WINDOWS; wid++) { + fsRolling.beginWindow(wid); + for (int tupleCounter = 0; tupleCounter < BLAST_SIZE && total < DATABASE_SIZE; tupleCounter++, total++) { + fsRolling.input.process("2014-12-1" + tupleCounter); + } + if (wid == 7) { + fsRolling.committed(wid - 1); + outputOperator.processTuple(mapping1); + outputOperator.processTuple(mapping2); + } + + fsRolling.endWindow(); + + if (wid == 9) { + Kryo kryo = new Kryo(); + FieldSerializer<HiveOperator> f1 = (FieldSerializer<HiveOperator>)kryo.getSerializer(HiveOperator.class); + FieldSerializer<HiveStore> f2 = (FieldSerializer<HiveStore>)kryo.getSerializer(HiveStore.class); + + f1.setCopyTransient(false); + f2.setCopyTransient(false); + newOp = kryo.copy(outputOperator); + outputOperator.teardown(); + newOp.setup(context); + + newOp.beginWindow(7); + newOp.processTuple(mapping3); + + newOp.endWindow(); + newOp.teardown(); + break; + } + + } + + hiveStore.connect(); + + client.execute("select * from " + tablename + " where dt='2014-12-10'"); + List<String> recordsInDatePartition1 = client.fetchAll(); + + client.execute("select * from " + tablename + " where dt='2014-12-11'"); + List<String> recordsInDatePartition2 = client.fetchAll(); + + client.execute("select * from " + tablename + " where dt='2014-12-12'"); + List<String> recordsInDatePartition3 = client.fetchAll(); + + client.execute("drop table " + tablename); + hiveStore.disconnect(); + + Assert.assertEquals(7, recordsInDatePartition1.size()); + for (int i = 0; i < recordsInDatePartition1.size(); i++) { + LOG.debug("records in first date partition are {}", recordsInDatePartition1.get(i)); + /*An array containing partition and data is returned as a string record, hence we need to upcast it to an object first + and then downcast to a string in order to use in Assert.*/ + Object record = recordsInDatePartition1.get(i); + Object[] records = (Object[])record; + Assert.assertEquals("2014-12-10", records[1]); + } + Assert.assertEquals(7, recordsInDatePartition2.size()); + for (int i = 0; i < recordsInDatePartition2.size(); i++) { + LOG.debug("records in second date partition are {}", recordsInDatePartition2.get(i)); + Object record = recordsInDatePartition2.get(i); + Object[] records = (Object[])record; + Assert.assertEquals("2014-12-11", records[1]); + } + Assert.assertEquals(10, recordsInDatePartition3.size()); + for (int i = 0; i < recordsInDatePartition3.size(); i++) { + LOG.debug("records in second date partition are {}", recordsInDatePartition3.get(i)); + Object record = recordsInDatePartition3.get(i); + Object[] records = (Object[])record; + Assert.assertEquals("2014-12-12", records[1]); + } + + } + + public class InnerObj + { + public InnerObj() + { + } + + private int id; + private String date; + + public String getDate() + { + return date; + } + + public void setDate(String date) + { + this.date = date; + } + + public int getId() + { + return id; + } + + public void setId(int id) + { + this.id = id; + } + + } + + private static final Logger LOG = LoggerFactory.getLogger(HiveMockTest.class); + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/src/test/java/com/datatorrent/contrib/hive/HiveStoreTest.java ---------------------------------------------------------------------- diff --git a/hive/src/test/java/com/datatorrent/contrib/hive/HiveStoreTest.java b/hive/src/test/java/com/datatorrent/contrib/hive/HiveStoreTest.java new file mode 100755 index 0000000..4f32899 --- /dev/null +++ b/hive/src/test/java/com/datatorrent/contrib/hive/HiveStoreTest.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.hive; + +import org.junit.Assert; +import org.junit.Test; + +public class HiveStoreTest +{ + @Test + public void defaultDriverTest() + { + HiveStore hiveStore = new HiveStore(); + Assert.assertEquals("Test default driver", HiveStore.HIVE_DRIVER, hiveStore.getDatabaseDriver()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 627168b..a8e50ad 100644 --- a/pom.xml +++ b/pom.xml @@ -202,6 +202,7 @@ <id>all-modules</id> <modules> <module>kafka</module> + <module>hive</module> <module>stream</module> <module>benchmark</module> <module>apps</module>
