APEXMALHAR-2087 Hive output module 1. Added Hive output module 2. Minor enhancements for getters, setters 3. Fixing some import order, checkstyle violations 4. Moving hive support under separate pom project 5. replacing deprecated calls with recommended calls in the test cases.
6. making test case compatible to parentclass (junit 3) 7. added hive dependency to benchmark 8. added hive to all-modules profile 9. Incorporating review comments Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/bb0a93d0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/bb0a93d0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/bb0a93d0 Branch: refs/heads/master Commit: bb0a93d02a91723a323fc09c23150cc5f1a4feb6 Parents: a91287c Author: yogidevendra <[email protected]> Authored: Tue May 10 15:28:17 2016 +0530 Committer: yogidevendra <[email protected]> Committed: Mon May 30 11:04:42 2016 +0530 ---------------------------------------------------------------------- benchmark/pom.xml | 60 +- contrib/pom.xml | 20 +- .../hive/AbstractFSRollingOutputOperator.java | 277 -------- .../contrib/hive/FSPojoToHiveOperator.java | 285 --------- .../datatorrent/contrib/hive/HiveOperator.java | 285 --------- .../com/datatorrent/contrib/hive/HiveStore.java | 51 -- .../contrib/hive/HiveStreamCodec.java | 75 --- .../datatorrent/contrib/hive/package-info.java | 20 - .../contrib/hive/FSRollingMapTestImpl.java | 49 -- .../contrib/hive/FSRollingTestImpl.java | 40 -- .../datatorrent/contrib/hive/HiveMockTest.java | 629 ------------------- .../datatorrent/contrib/hive/HiveStoreTest.java | 32 - hive/XmlJavadocCommentsExtractor.xsl | 48 ++ hive/pom.xml | 262 ++++++++ .../hive/AbstractFSRollingOutputOperator.java | 279 ++++++++ .../contrib/hive/FSPojoToHiveOperator.java | 292 +++++++++ .../datatorrent/contrib/hive/HiveOperator.java | 308 +++++++++ .../com/datatorrent/contrib/hive/HiveStore.java | 52 ++ .../contrib/hive/HiveStreamCodec.java | 79 +++ .../datatorrent/contrib/hive/package-info.java | 20 + .../apex/malhar/hive/HiveOutputModule.java | 482 ++++++++++++++ .../contrib/hive/FSRollingMapTestImpl.java | 49 ++ .../contrib/hive/FSRollingTestImpl.java | 39 ++ .../datatorrent/contrib/hive/HiveMockTest.java | 618 ++++++++++++++++++ .../datatorrent/contrib/hive/HiveStoreTest.java | 32 + pom.xml | 1 + 26 files changed, 2586 insertions(+), 1798 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/benchmark/pom.xml ---------------------------------------------------------------------- diff --git a/benchmark/pom.xml b/benchmark/pom.xml index 7569c71..239d9fa 100644 --- a/benchmark/pom.xml +++ b/benchmark/pom.xml @@ -204,6 +204,30 @@ </exclusions> </dependency> <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>malhar-hive</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>malhar-hive</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.1.1</version> @@ -520,42 +544,6 @@ <scope>provided</scope> </dependency> <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-jdbc</artifactId> - <version>0.13.1</version> - <optional>true</optional> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-exec</artifactId> - <version>0.13.1</version> - <optional>true</optional> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-service</artifactId> - <version>0.13.1</version> - <optional>true</optional> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> <version>0.9.0</version> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/contrib/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/pom.xml b/contrib/pom.xml index 6691390..fab629c 100755 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -221,7 +221,7 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-checkstyle-plugin</artifactId> <configuration> - <maxAllowedViolations>2791</maxAllowedViolations> + <maxAllowedViolations>2709</maxAllowedViolations> <logViolationsToConsole>${checkstyle.console}</logViolationsToConsole> </configuration> </plugin> @@ -566,18 +566,6 @@ </exclusions> </dependency> <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-service</artifactId> - <version>0.13.1</version> - <optional>true</optional> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>org.apache.solr</groupId> <artifactId>solr-solrj</artifactId> <version>4.10.1</version> @@ -604,12 +592,6 @@ <optional>true</optional> </dependency> <dependency> - <groupId>io.teknek</groupId> - <artifactId>hiveunit</artifactId> - <version>0.0.3</version> - <scope>test</scope> - </dependency> - <dependency> <groupId>org.rosuda</groupId> <artifactId>rengine</artifactId> <version>1.0</version> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/contrib/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java deleted file mode 100755 index ecfd179..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java +++ /dev/null @@ -1,277 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.hive; - -import java.io.File; -import java.io.IOException; -import java.util.*; -import java.util.Iterator; -import java.util.concurrent.ExecutionException; - -import javax.validation.constraints.Min; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.commons.lang3.mutable.MutableInt; - -import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Operator.CheckpointListener; -import com.datatorrent.api.annotation.Stateless; - -import com.datatorrent.netlet.util.DTThrowable; - -/** - * An implementation of FS Writer that writes text files to hdfs which are inserted - * into hive on committed window callback. HiveStreamCodec is used to make sure that data being sent to a particular hive partition - * goes to a specific operator partition by passing FSRollingOutputOperator to the stream codec. - * Also filename is determined uniquely for each tuple going to a specific hive partition. - * - * @since 2.1.0 - */ -public abstract class AbstractFSRollingOutputOperator<T> extends AbstractFileOutputOperator<T> implements CheckpointListener -{ - private transient String outputFilePath; - protected MutableInt partNumber; - protected HashMap<Long, ArrayList<String>> mapFilenames = new HashMap<Long, ArrayList<String>>(); - protected HashMap<String, ArrayList<String>> mapPartition = new HashMap<String, ArrayList<String>>(); - protected Queue<Long> queueWindows = new LinkedList<Long>(); - protected long windowIDOfCompletedPart = Stateless.WINDOW_ID; - protected long committedWindowId = Stateless.WINDOW_ID; - private boolean isEmptyWindow; - private transient int operatorId; - private int countEmptyWindow; - private ArrayList<String> partition = new ArrayList<String>(); - - //This variable is user configurable. - @Min(0) - private long maxWindowsWithNoData = 100; - - /** - * The output port that will emit a POJO containing file which is committed and specific hive partitions - * in which this file should be loaded to HiveOperator. - */ - public final transient DefaultOutputPort<FilePartitionMapping> outputPort = new DefaultOutputPort<FilePartitionMapping>(); - - public AbstractFSRollingOutputOperator() - { - countEmptyWindow = 0; - HiveStreamCodec<T> hiveCodec = new HiveStreamCodec<T>(); - hiveCodec.rollingOperator = this; - streamCodec = hiveCodec; - } - - @Override - public void setup(OperatorContext context) - { - String appId = context.getValue(DAG.APPLICATION_ID); - operatorId = context.getId(); - outputFilePath = File.separator + appId + File.separator + operatorId; - super.setup(context); - } - - @Override - public void beginWindow(long windowId) - { - isEmptyWindow = true; - windowIDOfCompletedPart = windowId; - } - - /* - * MapFilenames has mapping from completed file to windowId in which it got completed. - * Also maintaining a queue of these windowIds which helps in reducing iteration time in committed callback. - */ - @Override - protected void rotateHook(String finishedFile) - { - isEmptyWindow = false; - if (mapFilenames.containsKey(windowIDOfCompletedPart)) { - mapFilenames.get(windowIDOfCompletedPart).add(finishedFile); - } - else { - ArrayList<String> listFileNames = new ArrayList<String>(); - listFileNames.add(finishedFile); - mapFilenames.put(windowIDOfCompletedPart, listFileNames); - } - queueWindows.add(windowIDOfCompletedPart); - - } - - /* - * Filenames include operator Id and the specific hive partitions to which the file will be loaded. - * Partition is determined based on tuple and its implementation is left to user. - */ - @Override - protected String getFileName(T tuple) - { - isEmptyWindow = false; - partition = getHivePartition(tuple); - StringBuilder output = new StringBuilder(outputFilePath); - int numPartitions = partition.size(); - if (numPartitions > 0) { - for (int i = 0; i < numPartitions; i++) { - output.append(File.separator).append(partition.get(i)); - } - output.append(File.separator).append(operatorId).append("-transaction.out.part"); - String partFile = getPartFileNamePri(output.toString()); - mapPartition.put(partFile, partition); - } - return output.toString(); - } - - - /* - * Moving completed files into hive on committed window callback. - * Criteria for moving them is that the windowId in which they are completed - * should be less than committed window. - */ - @Override - public void committed(long windowId) - { - committedWindowId = windowId; - Iterator<Long> iterWindows = queueWindows.iterator(); - ArrayList<String> list = new ArrayList<String>(); - while (iterWindows.hasNext()) { - windowId = iterWindows.next(); - if (committedWindowId >= windowId) { - logger.debug("list is {}", mapFilenames.get(windowId)); - list = mapFilenames.get(windowId); - FilePartitionMapping partMap = new FilePartitionMapping(); - if(list!=null){ - for (int i = 0; i < list.size(); i++) { - partMap.setFilename(list.get(i)); - partMap.setPartition(mapPartition.get(list.get(i))); - outputPort.emit(partMap); - } - } - mapFilenames.remove(windowId); - iterWindows.remove(); - } - if (committedWindowId < windowId) { - break; - } - } - } - - @Override - public void checkpointed(long windowId) - { - } - - protected void rotateCall(String lastFile) - { - try { - this.rotate(lastFile); - } - catch (IOException ex) { - logger.debug(ex.getMessage()); - DTThrowable.rethrow(ex); - } - catch (ExecutionException ex) { - logger.debug(ex.getMessage()); - DTThrowable.rethrow(ex); - } - } - - public String getHDFSRollingLastFile() - { - Iterator<String> iterFileNames = this.openPart.keySet().iterator(); - String lastFile = null; - if (iterFileNames.hasNext()) { - lastFile = iterFileNames.next(); - partNumber = this.openPart.get(lastFile); - } - return getPartFileName(lastFile, - partNumber.intValue()); - } - - /** - * This method gets a List of Hive Partitions in which the tuple needs to be written to. - * Example: If hive partitions are date='2014-12-12',country='USA' then - * this method returns {"2014-12-12","USA"} - * The implementation is left to the user. - * @param tuple A received tuple to be written to a hive partition. - * @return ArrayList containing hive partition values. - */ - public abstract ArrayList<String> getHivePartition(T tuple); - - @Override - public void endWindow() - { - if (isEmptyWindow) { - countEmptyWindow++; - } - if (countEmptyWindow >= maxWindowsWithNoData) { - String lastFile = getHDFSRollingLastFile(); - rotateCall(lastFile); - countEmptyWindow = 0; - } - super.endWindow(); - - } - - public long getMaxWindowsWithNoData() - { - return maxWindowsWithNoData; - } - - public void setMaxWindowsWithNoData(long maxWindowsWithNoData) - { - this.maxWindowsWithNoData = maxWindowsWithNoData; - } - - /* - * A POJO which is emitted by output port of AbstractFSRollingOutputOperator implementation in DAG. - * The POJO contains the filename which will not be changed by FSRollingOutputOperator once its emitted. - * The POJO also contains the hive partitions to which the respective files will be moved. - */ - public static class FilePartitionMapping - { - private String filename; - private ArrayList<String> partition = new ArrayList<String>(); - - public ArrayList<String> getPartition() - { - return partition; - } - - public void setPartition(ArrayList<String> partition) - { - this.partition = partition; - } - - public String getFilename() - { - return filename; - } - - public void setFilename(String filename) - { - this.filename = filename; - } - - } - - private static final Logger logger = LoggerFactory.getLogger(AbstractFSRollingOutputOperator.class); - -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/contrib/src/main/java/com/datatorrent/contrib/hive/FSPojoToHiveOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/hive/FSPojoToHiveOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hive/FSPojoToHiveOperator.java deleted file mode 100644 index 8571c18..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/hive/FSPojoToHiveOperator.java +++ /dev/null @@ -1,285 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.hive; - -import com.datatorrent.lib.util.PojoUtils; -import com.datatorrent.lib.util.PojoUtils.Getter; -import com.datatorrent.lib.util.PojoUtils.GetterBoolean; -import com.datatorrent.lib.util.PojoUtils.GetterChar; -import com.datatorrent.lib.util.PojoUtils.GetterDouble; -import com.datatorrent.lib.util.PojoUtils.GetterFloat; -import com.datatorrent.lib.util.PojoUtils.GetterInt; -import com.datatorrent.lib.util.PojoUtils.GetterLong; -import com.datatorrent.lib.util.PojoUtils.GetterShort; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.sql.Date; - -/** - * An Implementation of AbstractFSRollingOutputOperator which takes any POJO as input, serializes the POJO as Hive delimiter separated values - * which are written in text files to hdfs, and are inserted into hive on committed window callback.This operator can handle outputting to multiple files when the output file depends on the tuple. - * - * @displayName: FS To Hive Operator - * @category Output - * @tags fs, hive, database - * @since 3.0.0 - */ -public class FSPojoToHiveOperator extends AbstractFSRollingOutputOperator<Object> -{ - private static final long serialVersionUID = 1L; - private ArrayList<String> hivePartitionColumns; - private ArrayList<String> hiveColumns; - private ArrayList<FIELD_TYPE> hiveColumnDataTypes; - private ArrayList<FIELD_TYPE> hivePartitionColumnDataTypes; - private transient ArrayList<Object> getters; - private ArrayList<String> expressionsForHiveColumns; - private ArrayList<String> expressionsForHivePartitionColumns; - - public ArrayList<String> getExpressionsForHivePartitionColumns() - { - return expressionsForHivePartitionColumns; - } - - public void setExpressionsForHivePartitionColumns(ArrayList<String> expressionsForHivePartitionColumns) - { - this.expressionsForHivePartitionColumns = expressionsForHivePartitionColumns; - } - - - /* - * A list of Java expressions in which each expression yields the specific table column value and partition column value in hive table from the input POJO. - */ - public ArrayList<String> getExpressionsForHiveColumns() - { - return expressionsForHiveColumns; - } - - public void setExpressionsForHiveColumns(ArrayList<String> expressions) - { - this.expressionsForHiveColumns = expressions; - } - - public FSPojoToHiveOperator() - { - super(); - getters = new ArrayList<Object>(); - } - - @SuppressWarnings("unchecked") - private void getValue(Object tuple, int index, FIELD_TYPE type, StringBuilder value) - { - switch (type) { - case CHARACTER: - value.append(((GetterChar<Object>)getters.get(index)).get(tuple)); - break; - case STRING: - value.append(((Getter<Object, String>)getters.get(index)).get(tuple)); - break; - case BOOLEAN: - value.append(((GetterBoolean<Object>)getters.get(index)).get(tuple)); - break; - case SHORT: - value.append(((GetterShort<Object>)getters.get(index)).get(tuple)); - break; - case INTEGER: - value.append(((GetterInt<Object>)getters.get(index)).get(tuple)); - break; - case LONG: - value.append(((GetterLong<Object>)getters.get(index)).get(tuple)); - break; - case FLOAT: - value.append(((GetterFloat<Object>)getters.get(index)).get(tuple)); - break; - case DOUBLE: - value.append(((GetterDouble<Object>)getters.get(index)).get(tuple)); - break; - case DATE: - value.append(((Getter<Object, Date>)getters.get(index)).get(tuple)); - break; - case TIMESTAMP: - value.append(((Getter<Object, Timestamp>)getters.get(index)).get(tuple)); - break; - case OTHER: - value.append(((Getter<Object, Object>)getters.get(index)).get(tuple)); - break; - default: - throw new RuntimeException("unsupported data type " + type); - } - } - - public enum FIELD_TYPE - { - BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE, TIMESTAMP, OTHER - }; - - /* - * Columns in Hive table. - */ - public ArrayList<String> getHiveColumns() - { - return hiveColumns; - } - - public void setHiveColumns(ArrayList<String> hiveColumns) - { - this.hiveColumns = hiveColumns; - } - - /* - * Partition Columns in Hive table.Example: dt for date,ts for timestamp - */ - public ArrayList<String> getHivePartitionColumns() - { - return hivePartitionColumns; - } - - public void setHivePartitionColumns(ArrayList<String> hivePartitionColumns) - { - this.hivePartitionColumns = hivePartitionColumns; - } - - /* - * Data Types of Hive table data columns. - * Example: If the Hive table has two columns of data type int and float, - * then hiveColumnsDataTypes = {INTEGER,FLOAT}. - * Particular Data Type can be chosen from the List of data types provided. - */ - public ArrayList<FIELD_TYPE> getHiveColumnDataTypes() - { - return hiveColumnDataTypes; - } - - public void setHiveColumnDataTypes(ArrayList<FIELD_TYPE> hiveColumnDataTypes) - { - this.hiveColumnDataTypes = hiveColumnDataTypes; - } - - /* - * Data Types of Hive table Partition Columns. - * Example: If the Hive table has two columns of data type int and float and is partitioned by date of type String, - * then hivePartitionColumnDataTypes = {STRING}. - * Particular Data Type can be chosen from the List of data types provided. - */ - public ArrayList<FIELD_TYPE> getHivePartitionColumnDataTypes() - { - return hivePartitionColumnDataTypes; - } - - public void setHivePartitionColumnDataTypes(ArrayList<FIELD_TYPE> hivePartitionColumnDataTypes) - { - this.hivePartitionColumnDataTypes = hivePartitionColumnDataTypes; - } - - @Override - @SuppressWarnings("unchecked") - public ArrayList<String> getHivePartition(Object tuple) - { - int sizeOfColumns = hiveColumns.size(); - int sizeOfPartitionColumns = hivePartitionColumns.size(); - int size = sizeOfColumns + sizeOfPartitionColumns; - ArrayList<String> hivePartitionColumnValues = new ArrayList<String>(); - String partitionColumnValue; - for (int i = sizeOfColumns; i < size; i++) { - // FIELD_TYPE type = hiveColumnsDataTypes.get(i); - //partitionColumnValue = getValue(tuple, sizeOfColumns, type); - partitionColumnValue = ((Getter<Object, String>)getters.get(i)).get(tuple); - hivePartitionColumnValues.add(partitionColumnValue); - } - return hivePartitionColumnValues; - } - - @Override - public void processTuple(Object tuple) - { - if (getters.isEmpty()) { - processFirstTuple(tuple); - } - super.processTuple(tuple); - } - - @SuppressWarnings("unchecked") - public void processFirstTuple(Object tuple) - { - Class<?> fqcn = tuple.getClass(); - createGetters(fqcn, hiveColumns.size(), expressionsForHiveColumns,hiveColumnDataTypes); - createGetters(fqcn, hivePartitionColumns.size(), expressionsForHivePartitionColumns,hivePartitionColumnDataTypes); - } - - protected void createGetters(Class<?> fqcn, int size, ArrayList<String> expressions,ArrayList<FIELD_TYPE> columnDataTypes) - { - for (int i = 0; i < size; i++) { - FIELD_TYPE type = columnDataTypes.get(i); - final Object getter; - final String getterExpression = expressions.get(i); - switch (type) { - case CHARACTER: - getter = PojoUtils.createGetterChar(fqcn, getterExpression); - break; - case STRING: - getter = PojoUtils.createGetter(fqcn, getterExpression, String.class); - break; - case BOOLEAN: - getter = PojoUtils.createGetterBoolean(fqcn, getterExpression); - break; - case SHORT: - getter = PojoUtils.createGetterShort(fqcn, getterExpression); - break; - case INTEGER: - getter = PojoUtils.createGetterInt(fqcn, getterExpression); - break; - case LONG: - getter = PojoUtils.createGetterLong(fqcn, getterExpression); - break; - case FLOAT: - getter = PojoUtils.createGetterFloat(fqcn, getterExpression); - break; - case DOUBLE: - getter = PojoUtils.createGetterDouble(fqcn, getterExpression); - break; - case DATE: - getter = PojoUtils.createGetter(fqcn, getterExpression, Date.class); - break; - case TIMESTAMP: - getter = PojoUtils.createGetter(fqcn, getterExpression, Timestamp.class); - break; - default: - getter = PojoUtils.createGetter(fqcn, getterExpression, Object.class); - } - - getters.add(getter); - - } - - } - - @Override - @SuppressWarnings("unchecked") - protected byte[] getBytesForTuple(Object tuple) - { - int size = hiveColumns.size(); - StringBuilder result = new StringBuilder(); - for (int i = 0; i < size; i++) { - FIELD_TYPE type = hiveColumnDataTypes.get(i); - getValue(tuple, i, type, result); - result.append("\n"); - } - return (result.toString()).getBytes(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/contrib/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java deleted file mode 100755 index c8fd913..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java +++ /dev/null @@ -1,285 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.hive; - -import java.sql.SQLException; -import java.sql.Statement; -import java.util.*; - -import javax.annotation.Nonnull; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.db.AbstractStoreOutputOperator; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.annotation.OperatorAnnotation; -import com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator.FilePartitionMapping; -import com.datatorrent.lib.counters.BasicCounters; -import java.io.IOException; -import org.apache.commons.lang.mutable.MutableLong; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -/** - * Hive operator which can insert data in txt format in tables/partitions from a file written in hdfs location. - * The file contains data of the same data type as the hive tables created by user and is already committed. - * No changes will be made to the input file once its given to HiveOperator. - * This is a fault tolerant implementation of HiveOperator which assumes that load operation - * is an atomic operation in Hive. - * - * @category Output - * @tags database, sql, hive - * - * @since 2.1.0 - */ -@OperatorAnnotation(checkpointableWithinAppWindow = false) -public class HiveOperator extends AbstractStoreOutputOperator<FilePartitionMapping, HiveStore> -{ - //This Property is user configurable. - protected ArrayList<String> hivePartitionColumns = new ArrayList<String>(); - private transient String localString = ""; - protected HiveStore hivestore; - - /** - * The file system used to write to. - */ - protected transient FileSystem fs; - @Nonnull - protected String tablename; - @Nonnull - protected String hivepath; - /** - * This is the operator context passed at setup. - */ - private transient OperatorContext context; - - /** - * File output counters. - */ - private final BasicCounters<MutableLong> fileCounters = new BasicCounters<MutableLong>(MutableLong.class); - - - /** - * The total time in milliseconds the operator has been running for. - */ - private long totalTime; - /** - * Last time stamp collected. - */ - private long lastTimeStamp; - /** - * The total number of bytes written by the operator. - */ - protected long totalBytesWritten = 0; - - @Override - public void setup(OperatorContext context) - { - try { - fs = getHDFSInstance(); - } - catch (IOException ex) { - throw new RuntimeException(ex); - } - - this.context = context; - lastTimeStamp = System.currentTimeMillis(); - - fileCounters.setCounter(Counters.TOTAL_BYTES_WRITTEN, - new MutableLong()); - fileCounters.setCounter(Counters.TOTAL_TIME_ELAPSED, - new MutableLong()); - super.setup(context); - } - - /** - * Override this method to change the FileSystem instance that is used by the operator. - * - * @return A FileSystem object. - * @throws IOException - */ - protected FileSystem getHDFSInstance() throws IOException - { - FileSystem tempFS = FileSystem.newInstance(new Path(hivestore.filepath).toUri(), new Configuration()); - if (!tempFS.getScheme().equalsIgnoreCase("hdfs")) { - localString = " local"; - } - return tempFS; - } - - /** - * Function to process each incoming tuple. - * The input is FilePartitionMapping which is a POJO containing filename which is already committed - * and will not be changed.The POJO also contains the hive partitions to which the respective files will be moved. - * - * @param tuple incoming tuple which has filename and hive partition. - */ - @Override - public void processTuple(FilePartitionMapping tuple) - { - String command = processHiveFile(tuple); - logger.debug("commands is {}",command); - if (command != null) { - Statement stmt; - try { - stmt = hivestore.getConnection().createStatement(); - stmt.execute(command); - } - catch (SQLException ex) { - throw new RuntimeException("Moving file into hive failed" + ex); - } - } - - } - - /* - * This function extracts the filename and partitions to which the file will be loaded. - * It returns the command to be executed by hive. - */ - private String processHiveFile(FilePartitionMapping tuple) - { - String filename = tuple.getFilename(); - ArrayList<String> partition = tuple.getPartition(); - String command = null; - String filepath = hivestore.getFilepath() + Path.SEPARATOR + filename; - logger.debug("processing {} filepath", filepath); - int numPartitions = partition.size(); - try { - if (fs.exists(new Path(filepath))) { - if (numPartitions > 0) { - StringBuilder partitionString = new StringBuilder(hivePartitionColumns.get(0) + "='" + partition.get(0) + "'"); - int i = 0; - while (i < numPartitions) { - i++; - if (i == numPartitions) { - break; - } - partitionString.append(",").append(hivePartitionColumns.get(i)).append("='").append(partition.get(i)).append("'"); - } - if (i < hivePartitionColumns.size()) { - partitionString.append(",").append(hivePartitionColumns.get(i)); - } - command = "load data" + localString + " inpath '" + filepath + "' into table " + tablename + " PARTITION" + "( " + partitionString + " )"; - } - else { - command = "load data" + localString + " inpath '" + filepath + "' into table " + tablename; - } - } - } - catch (IOException e) { - throw new RuntimeException(e); - } - logger.debug("command is {}" , command); - return command; - } - - @Override - public void endWindow() - { - long currentTimeStamp = System.currentTimeMillis(); - totalTime += currentTimeStamp - lastTimeStamp; - lastTimeStamp = currentTimeStamp; - - fileCounters.getCounter(Counters.TOTAL_TIME_ELAPSED).setValue(totalTime); - fileCounters.getCounter(Counters.TOTAL_BYTES_WRITTEN).setValue(totalBytesWritten); - context.setCounters(fileCounters); - } - - public void teardown() - { - long currentTimeStamp = System.currentTimeMillis(); - totalTime += currentTimeStamp - lastTimeStamp; - lastTimeStamp = currentTimeStamp; - } - - /** - * Get the partition columns in hive to which data needs to be loaded. - * @return List of Hive Partition Columns - */ - public ArrayList<String> getHivePartitionColumns() - { - return hivePartitionColumns; - } - - /** - * Set the hive partition columns to which data needs to be loaded. - * @param hivePartitionColumns - */ - public void setHivePartitionColumns(ArrayList<String> hivePartitionColumns) - { - this.hivePartitionColumns = hivePartitionColumns; - } - - /** - * Get the table name in hive. - * @return table name - */ - public String getTablename() - { - return tablename; - } - - /** - * Set the table name in hive. - * @param tablename - */ - public void setTablename(String tablename) - { - this.tablename = tablename; - } - - /** - * Gets the store set for hive; - * @return hive store - */ - public HiveStore getHivestore() - { - return hivestore; - } - - /** - * Set the store in hive. - * - * @param hivestore - */ - public void setHivestore(HiveStore hivestore) - { - this.hivestore = hivestore; - super.setStore(hivestore); - } - - public static enum Counters - { - /** - * An enum for counters representing the total number of bytes written - * by the operator. - */ - TOTAL_BYTES_WRITTEN, - - /** - * An enum for counters representing the total time the operator has - * been operational for. - */ - TOTAL_TIME_ELAPSED - } - private static final Logger logger = LoggerFactory.getLogger(HiveOperator.class); -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/contrib/src/main/java/com/datatorrent/contrib/hive/HiveStore.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/hive/HiveStore.java b/contrib/src/main/java/com/datatorrent/contrib/hive/HiveStore.java deleted file mode 100755 index 3b25e49..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/hive/HiveStore.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.hive; - -import com.datatorrent.lib.db.jdbc.JdbcStore; -import javax.validation.constraints.NotNull; - -/** - * Hive Store that extends Jdbc Store and provides its own driver name. - * - * @since 2.1.0 - */ -public class HiveStore extends JdbcStore -{ - public HiveStore() - { - super(); - this.setDatabaseDriver(HIVE_DRIVER); - } - - public static final String HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver"; - @NotNull - public String filepath; - - public String getFilepath() - { - return filepath; - } - - public void setFilepath(String filepath) - { - this.filepath = filepath; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/contrib/src/main/java/com/datatorrent/contrib/hive/HiveStreamCodec.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/hive/HiveStreamCodec.java b/contrib/src/main/java/com/datatorrent/contrib/hive/HiveStreamCodec.java deleted file mode 100755 index 8da7556..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/hive/HiveStreamCodec.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.hive; - -import java.io.*; - -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; - -import com.datatorrent.lib.codec.KryoSerializableStreamCodec; - -/** - * Stream codec for uniform distribution of tuples on upstream operator. - * This is used to make sure that data being sent to a particular hive partition - * goes to a specific operator partition by passing FSRollingOutputOperator to the stream codec. - * - * @since 2.1.0 - */ -public class HiveStreamCodec<T> extends KryoSerializableStreamCodec<T> implements Externalizable -{ - private static final long serialVersionUID = 201412121604L; - - protected AbstractFSRollingOutputOperator<T> rollingOperator; - - @Override - public void writeExternal(ObjectOutput out) throws IOException - { - - ByteArrayOutputStream os = new ByteArrayOutputStream(); - ObjectOutputStream obj = new ObjectOutputStream(os); - Output output = new Output(obj); - kryo.writeClassAndObject(output, rollingOperator); - byte[] outBytes = output.toBytes(); - out.writeInt(outBytes.length); - out.write(outBytes, 0, outBytes.length); - out.flush(); - } - - @Override - @SuppressWarnings("unchecked") - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException - { - int size = in.readInt(); - byte[] data = new byte[size]; - in.readFully(data); - Input input = new Input(data); - input.setBuffer(data); - rollingOperator = (AbstractFSRollingOutputOperator)kryo.readClassAndObject(input); - } - - - @Override - public int getPartition(T o) - { - return rollingOperator.getHivePartition(o).hashCode(); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/contrib/src/main/java/com/datatorrent/contrib/hive/package-info.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/hive/package-info.java b/contrib/src/main/java/com/datatorrent/contrib/hive/package-info.java deleted file mode 100644 index ad93d8f..0000000 --- a/contrib/src/main/java/com/datatorrent/contrib/hive/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ [email protected] -package com.datatorrent.contrib.hive; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/contrib/src/test/java/com/datatorrent/contrib/hive/FSRollingMapTestImpl.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/hive/FSRollingMapTestImpl.java b/contrib/src/test/java/com/datatorrent/contrib/hive/FSRollingMapTestImpl.java deleted file mode 100755 index 843adbb..0000000 --- a/contrib/src/test/java/com/datatorrent/contrib/hive/FSRollingMapTestImpl.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.hive; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Map; - -public class FSRollingMapTestImpl extends AbstractFSRollingOutputOperator<Map<String,Object>> -{ - @Override - public ArrayList<String> getHivePartition(Map<String,Object> tuple) - { - ArrayList<String> hivePartitions = new ArrayList<String>(); - hivePartitions.add("2014-12-10"); - return(hivePartitions); - } - - @Override - protected byte[] getBytesForTuple(Map<String, Object> tuple) - { - Iterator<String> keyIter = tuple.keySet().iterator(); - StringBuilder writeToHive = new StringBuilder(""); - - while (keyIter.hasNext()) { - String key = keyIter.next(); - Object obj = tuple.get(key); - writeToHive.append(key).append(":").append(obj).append("\n"); - } - return writeToHive.toString().getBytes(); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/contrib/src/test/java/com/datatorrent/contrib/hive/FSRollingTestImpl.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/hive/FSRollingTestImpl.java b/contrib/src/test/java/com/datatorrent/contrib/hive/FSRollingTestImpl.java deleted file mode 100755 index d2bfd34..0000000 --- a/contrib/src/test/java/com/datatorrent/contrib/hive/FSRollingTestImpl.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.hive; - -import java.util.ArrayList; - - -public class FSRollingTestImpl extends AbstractFSRollingOutputOperator<String> -{ - @Override - public ArrayList<String> getHivePartition(String tuple) - { - ArrayList<String> hivePartitions = new ArrayList<String>(); - hivePartitions.add(tuple); - return(hivePartitions); - } - - @Override - protected byte[] getBytesForTuple(String tuple) - { - return (tuple + "\n").getBytes(); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/contrib/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java b/contrib/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java deleted file mode 100755 index c552072..0000000 --- a/contrib/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java +++ /dev/null @@ -1,629 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.hive; - -import io.teknek.hiveunit.HiveTestService; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.helper.OperatorContextTestHelper; - -import com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator.FilePartitionMapping; - -import com.datatorrent.api.Attribute.AttributeMap; -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DAG; -import com.datatorrent.api.Operator.ProcessingMode; -import com.datatorrent.contrib.hive.FSPojoToHiveOperator.FIELD_TYPE; -import com.datatorrent.lib.util.TestUtils.TestInfo; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.serializers.FieldSerializer; -import java.io.File; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.HashMap; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.thrift.TException; -import org.junit.Rule; -import org.junit.runner.Description; - -public class HiveMockTest extends HiveTestService -{ - public static final String APP_ID = "HiveOperatorTest"; - - public static final int OPERATOR_ID = 0; - public static final int NUM_WINDOWS = 10; - public static final int BLAST_SIZE = 10; - public static final int DATABASE_SIZE = NUM_WINDOWS * BLAST_SIZE; - public static final String tablename = "temp"; - public static final String tablepojo = "temppojo"; - public static final String tablemap = "tempmap"; - public static String delimiterMap = ":"; - public static final String HOST = "localhost"; - public static final String PORT = "10000"; - public static final String DATABASE = "default"; - public static final String HOST_PREFIX = "jdbc:hive://"; - - @Rule - public TestInfo testMeta = new HiveTestWatcher(); - - public static class HiveTestWatcher extends TestInfo - { - @Override - public String getDir() - { - String filePath = new File("target/hive").getAbsolutePath(); - LOG.debug("filepath is {}", filePath); - return filePath; - } - - @Override - protected void starting(Description description) - { - super.starting(description); - new File(getDir()).mkdir(); - } - - @Override - protected void finished(Description description) - { - super.finished(description); - FileUtils.deleteQuietly(new File(getDir())); - } - - } - - public HiveMockTest() throws IOException, Exception - { - super(); - } - - public static HiveStore createStore(HiveStore hiveStore) - { - String host = HOST; - String port = PORT; - - if (hiveStore == null) { - hiveStore = new HiveStore(); - } - - StringBuilder sb = new StringBuilder(); - String tempHost = HOST_PREFIX + host + ":" + port; - - LOG.debug("Host name: {}", tempHost); - LOG.debug("Port: {}", 10000); - - sb.append("user:").append("").append(","); - sb.append("password:").append(""); - - String properties = sb.toString(); - LOG.debug(properties); - hiveStore.setDatabaseDriver("org.apache.hive.jdbc.HiveDriver"); - - hiveStore.setDatabaseUrl("jdbc:hive2://"); - hiveStore.setConnectionProperties(properties); - return hiveStore; - } - - public static void hiveInitializeDatabase(HiveStore hiveStore) throws SQLException - { - hiveStore.connect(); - Statement stmt = hiveStore.getConnection().createStatement(); - // show tables - String sql = "show tables"; - - LOG.debug(sql); - ResultSet res = stmt.executeQuery(sql); - if (res.next()) { - LOG.debug("tables are {}", res.getString(1)); - } - - stmt.execute("DROP TABLE " + tablename); - - stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename + " (col1 String) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n' \n" - + "STORED AS TEXTFILE "); - /*ResultSet res = stmt.execute("CREATE TABLE IF NOT EXISTS temp4 (col1 map<string,int>,col2 map<string,int>,col3 map<string,int>,col4 map<String,timestamp>, col5 map<string,double>,col6 map<string,double>,col7 map<string,int>,col8 map<string,int>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' \n" - + "COLLECTION ITEMS TERMINATED BY '\n' \n" - + "MAP KEYS TERMINATED BY ':' \n" - + "LINES TERMINATED BY '\n' " - + "STORED AS TEXTFILE");*/ - - hiveStore.disconnect(); - } - - public static void hiveInitializePOJODatabase(HiveStore hiveStore) throws SQLException - { - hiveStore.connect(); - Statement stmt = hiveStore.getConnection().createStatement(); - // show tables - String sql = "show tables"; - - LOG.debug(sql); - ResultSet res = stmt.executeQuery(sql); - if (res.next()) { - LOG.debug("tables are {}", res.getString(1)); - } - stmt.execute("DROP TABLE " + tablepojo); - - stmt.execute("CREATE TABLE " + tablepojo + " (col1 int) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n' \n" - + "STORED AS TEXTFILE "); - hiveStore.disconnect(); - } - - public static void hiveInitializeMapDatabase(HiveStore hiveStore) throws SQLException - { - hiveStore.connect(); - Statement stmt = hiveStore.getConnection().createStatement(); - // show tables - String sql = "show tables"; - - LOG.debug(sql); - ResultSet res = stmt.executeQuery(sql); - if (res.next()) { - LOG.debug(res.getString(1)); - } - - stmt.execute("DROP TABLE " + tablemap); - - stmt.execute("CREATE TABLE IF NOT EXISTS " + tablemap + " (col1 map<string,int>) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n' \n" - + "MAP KEYS TERMINATED BY '" + delimiterMap + "' \n" - + "STORED AS TEXTFILE "); - hiveStore.disconnect(); - } - - @Override - public void setUp() throws Exception - { - System.setProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.toString(), testMeta.getDir()); - super.setUp(); - } - - @Test - public void testInsertString() throws Exception - { - HiveStore hiveStore = createStore(null); - hiveStore.setFilepath(testMeta.getDir()); - ArrayList<String> hivePartitionColumns = new ArrayList<String>(); - hivePartitionColumns.add("dt"); - hiveInitializeDatabase(createStore(null)); - HiveOperator hiveOperator = new HiveOperator(); - hiveOperator.setHivestore(hiveStore); - hiveOperator.setTablename(tablename); - hiveOperator.setHivePartitionColumns(hivePartitionColumns); - - FSRollingTestImpl fsRolling = new FSRollingTestImpl(); - fsRolling.setFilePath(testMeta.getDir()); - short permission = 511; - fsRolling.setFilePermission(permission); - fsRolling.setMaxLength(128); - AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap(); - attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_LEAST_ONCE); - attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, -1L); - attributeMap.put(DAG.APPLICATION_ID, APP_ID); - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap); - - fsRolling.setup(context); - hiveOperator.setup(context); - FilePartitionMapping mapping1 = new FilePartitionMapping(); - FilePartitionMapping mapping2 = new FilePartitionMapping(); - mapping1.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-10" + "/" + "0-transaction.out.part.0"); - ArrayList<String> partitions1 = new ArrayList<String>(); - partitions1.add("2014-12-10"); - mapping1.setPartition(partitions1); - ArrayList<String> partitions2 = new ArrayList<String>(); - partitions2.add("2014-12-11"); - mapping2.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-11" + "/" + "0-transaction.out.part.0"); - mapping2.setPartition(partitions2); - for (int wid = 0, total = 0; - wid < NUM_WINDOWS; - wid++) { - fsRolling.beginWindow(wid); - for (int tupleCounter = 0; - tupleCounter < BLAST_SIZE && total < DATABASE_SIZE; - tupleCounter++, total++) { - fsRolling.input.process("2014-12-1" + tupleCounter); - } - if (wid == 7) { - fsRolling.committed(wid - 1); - hiveOperator.processTuple(mapping1); - hiveOperator.processTuple(mapping2); - } - - fsRolling.endWindow(); - } - - fsRolling.teardown(); - hiveStore.connect(); - client.execute("select * from " + tablename + " where dt='2014-12-10'"); - List<String> recordsInDatePartition1 = client.fetchAll(); - - client.execute("select * from " + tablename + " where dt='2014-12-11'"); - List<String> recordsInDatePartition2 = client.fetchAll(); - client.execute("drop table " + tablename); - hiveStore.disconnect(); - - Assert.assertEquals(7, recordsInDatePartition1.size()); - for (int i = 0; i < recordsInDatePartition1.size(); i++) { - LOG.debug("records in first date partition are {}", recordsInDatePartition1.get(i)); - /*An array containing partition and data is returned as a string record, hence we need to upcast it to an object first - and then downcast to a string in order to use in Assert.*/ - Object record = recordsInDatePartition1.get(i); - Object[] records = (Object[])record; - Assert.assertEquals("2014-12-10", records[1]); - } - Assert.assertEquals(7, recordsInDatePartition2.size()); - for (int i = 0; i < recordsInDatePartition2.size(); i++) { - LOG.debug("records in second date partition are {}", recordsInDatePartition2.get(i)); - Object record = recordsInDatePartition2.get(i); - Object[] records = (Object[])record; - Assert.assertEquals("2014-12-11", records[1]); - } - } - - @Test - public void testInsertPOJO() throws Exception - { - HiveStore hiveStore = createStore(null); - hiveStore.setFilepath(testMeta.getDir()); - ArrayList<String> hivePartitionColumns = new ArrayList<String>(); - hivePartitionColumns.add("dt"); - ArrayList<String> hiveColumns = new ArrayList<String>(); - hiveColumns.add("col1"); - hiveInitializePOJODatabase(createStore(null)); - HiveOperator hiveOperator = new HiveOperator(); - hiveOperator.setHivestore(hiveStore); - hiveOperator.setTablename(tablepojo); - hiveOperator.setHivePartitionColumns(hivePartitionColumns); - - FSPojoToHiveOperator fsRolling = new FSPojoToHiveOperator(); - fsRolling.setFilePath(testMeta.getDir()); - fsRolling.setHiveColumns(hiveColumns); - ArrayList<FIELD_TYPE> fieldtypes = new ArrayList<FIELD_TYPE>(); - ArrayList<FIELD_TYPE> partitiontypes = new ArrayList<FIELD_TYPE>(); - fieldtypes.add(FIELD_TYPE.INTEGER); - partitiontypes.add(FIELD_TYPE.STRING); - fsRolling.setHiveColumnDataTypes(fieldtypes); - fsRolling.setHivePartitionColumnDataTypes(partitiontypes); - //ArrayList<FIELD_TYPE> partitionColumnType = new ArrayList<FIELD_TYPE>(); - //partitionColumnType.add(FIELD_TYPE.STRING); - fsRolling.setHivePartitionColumns(hivePartitionColumns); - // fsRolling.setHivePartitionColumnsDataTypes(partitionColumnType); - ArrayList<String> expressions = new ArrayList<String>(); - expressions.add("getId()"); - ArrayList<String> expressionsPartitions = new ArrayList<String>(); - - expressionsPartitions.add("getDate()"); - short permission = 511; - fsRolling.setFilePermission(permission); - fsRolling.setMaxLength(128); - fsRolling.setExpressionsForHiveColumns(expressions); - fsRolling.setExpressionsForHivePartitionColumns(expressionsPartitions); - AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap(); - attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_LEAST_ONCE); - attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, -1L); - attributeMap.put(DAG.APPLICATION_ID, APP_ID); - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap); - - fsRolling.setup(context); - hiveOperator.setup(context); - FilePartitionMapping mapping1 = new FilePartitionMapping(); - FilePartitionMapping mapping2 = new FilePartitionMapping(); - mapping1.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-11" + "/" + "0-transaction.out.part.0"); - ArrayList<String> partitions1 = new ArrayList<String>(); - partitions1.add("2014-12-11"); - mapping1.setPartition(partitions1); - ArrayList<String> partitions2 = new ArrayList<String>(); - partitions2.add("2014-12-12"); - mapping2.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-12" + "/" + "0-transaction.out.part.0"); - mapping2.setPartition(partitions2); - for (int wid = 0, total = 0; - wid < NUM_WINDOWS; - wid++) { - fsRolling.beginWindow(wid); - for (int tupleCounter = 1; - tupleCounter < BLAST_SIZE && total < DATABASE_SIZE; - tupleCounter++, total++) { - InnerObj innerObj = new InnerObj(); - innerObj.setId(tupleCounter); - innerObj.setDate("2014-12-1" + tupleCounter); - fsRolling.input.process(innerObj); - } - if (wid == 7) { - fsRolling.committed(wid - 1); - hiveOperator.processTuple(mapping1); - hiveOperator.processTuple(mapping2); - } - - fsRolling.endWindow(); - } - - fsRolling.teardown(); - hiveStore.connect(); - client.execute("select * from " + tablepojo + " where dt='2014-12-11'"); - List<String> recordsInDatePartition1 = client.fetchAll(); - - client.execute("select * from " + tablepojo + " where dt='2014-12-12'"); - List<String> recordsInDatePartition2 = client.fetchAll(); - client.execute("drop table " + tablepojo); - hiveStore.disconnect(); - - Assert.assertEquals(7, recordsInDatePartition1.size()); - for (int i = 0; i < recordsInDatePartition1.size(); i++) { - LOG.debug("records in first date partition are {}", recordsInDatePartition1.get(i)); - /*An array containing partition and data is returned as a string record, hence we need to upcast it to an object first - and then downcast to a string in order to use in Assert.*/ - Object record = recordsInDatePartition1.get(i); - Object[] records = (Object[])record; - Assert.assertEquals(1, records[0]); - Assert.assertEquals("2014-12-11", records[1]); - } - Assert.assertEquals(7, recordsInDatePartition2.size()); - for (int i = 0; i < recordsInDatePartition2.size(); i++) { - LOG.debug("records in second date partition are {}", recordsInDatePartition2.get(i)); - Object record = recordsInDatePartition2.get(i); - Object[] records = (Object[])record; - Assert.assertEquals(2, records[0]); - Assert.assertEquals("2014-12-12", records[1]); - } - } - - @Test - public void testHiveInsertMapOperator() throws SQLException, TException - { - HiveStore hiveStore = createStore(null); - hiveStore.setFilepath(testMeta.getDir()); - ArrayList<String> hivePartitionColumns = new ArrayList<String>(); - hivePartitionColumns.add("dt"); - hiveInitializeMapDatabase(createStore(null)); - HiveOperator hiveOperator = new HiveOperator(); - hiveOperator.setHivestore(hiveStore); - hiveOperator.setTablename(tablemap); - hiveOperator.setHivePartitionColumns(hivePartitionColumns); - - FSRollingMapTestImpl fsRolling = new FSRollingMapTestImpl(); - fsRolling.setFilePath(testMeta.getDir()); - short permission = 511; - fsRolling.setFilePermission(permission); - fsRolling.setMaxLength(128); - AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap(); - attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_LEAST_ONCE); - attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, -1L); - attributeMap.put(DAG.APPLICATION_ID, APP_ID); - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap); - - fsRolling.setup(context); - hiveOperator.setup(context); - HashMap<String, Object> map = new HashMap<String, Object>(); - FilePartitionMapping mapping1 = new FilePartitionMapping(); - FilePartitionMapping mapping2 = new FilePartitionMapping(); - ArrayList<String> partitions1 = new ArrayList<String>(); - partitions1.add("2014-12-10"); - mapping1.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-10" + "/" + "0-transaction.out.part.0"); - mapping1.setPartition(partitions1); - ArrayList<String> partitions2 = new ArrayList<String>(); - partitions2.add("2014-12-11"); - mapping2.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-11" + "/" + "0-transaction.out.part.0"); - mapping2.setPartition(partitions2); - for (int wid = 0; - wid < NUM_WINDOWS; - wid++) { - fsRolling.beginWindow(wid); - for (int tupleCounter = 0; - tupleCounter < BLAST_SIZE; - tupleCounter++) { - map.put(2014 - 12 - 10 + "", 2014 - 12 - 10); - fsRolling.input.put(map); - map.clear(); - } - - if (wid == 7) { - fsRolling.committed(wid - 1); - hiveOperator.processTuple(mapping1); - hiveOperator.processTuple(mapping2); - } - - fsRolling.endWindow(); - } - - fsRolling.teardown(); - - hiveStore.connect(); - - client.execute("select * from " + tablemap + " where dt='2014-12-10'"); - List<String> recordsInDatePartition1 = client.fetchAll(); - - client.execute("drop table " + tablemap); - hiveStore.disconnect(); - - Assert.assertEquals(13, recordsInDatePartition1.size()); - for (int i = 0; i < recordsInDatePartition1.size(); i++) { - LOG.debug("records in first date partition are {}", recordsInDatePartition1.get(i)); - /*An array containing partition and data is returned as a string record, hence we need to upcast it to an object first - and then downcast to a string in order to use in Assert.*/ - Object record = recordsInDatePartition1.get(i); - Object[] records = (Object[])record; - Assert.assertEquals("2014-12-10", records[1]); - } - - } - - @Test - public void testHDFSHiveCheckpoint() throws SQLException, TException - { - hiveInitializeDatabase(createStore(null)); - HiveStore hiveStore = createStore(null); - hiveStore.setFilepath(testMeta.getDir()); - HiveOperator outputOperator = new HiveOperator(); - HiveOperator newOp; - - outputOperator.setHivestore(hiveStore); - ArrayList<String> hivePartitionColumns = new ArrayList<String>(); - hivePartitionColumns.add("dt"); - FSRollingTestImpl fsRolling = new FSRollingTestImpl(); - hiveInitializeDatabase(createStore(null)); - outputOperator.setHivePartitionColumns(hivePartitionColumns); - outputOperator.setTablename(tablename); - fsRolling.setFilePath(testMeta.getDir()); - short persmission = 511; - fsRolling.setFilePermission(persmission); - fsRolling.setMaxLength(128); - AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap(); - attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_LEAST_ONCE); - attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, -1L); - attributeMap.put(DAG.APPLICATION_ID, APP_ID); - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap); - - fsRolling.setup(context); - - FilePartitionMapping mapping1 = new FilePartitionMapping(); - FilePartitionMapping mapping2 = new FilePartitionMapping(); - FilePartitionMapping mapping3 = new FilePartitionMapping(); - outputOperator.setup(context); - mapping1.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-10" + "/" + "0-transaction.out.part.0"); - ArrayList<String> partitions1 = new ArrayList<String>(); - partitions1.add("2014-12-10"); - mapping1.setPartition(partitions1); - mapping2.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-11" + "/" + "0-transaction.out.part.0"); - ArrayList<String> partitions2 = new ArrayList<String>(); - partitions2.add("2014-12-11"); - mapping2.setPartition(partitions2); - ArrayList<String> partitions3 = new ArrayList<String>(); - partitions3.add("2014-12-12"); - mapping3.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-12" + "/" + "0-transaction.out.part.0"); - mapping3.setPartition(partitions3); - for (int wid = 0, total = 0; - wid < NUM_WINDOWS; - wid++) { - fsRolling.beginWindow(wid); - for (int tupleCounter = 0; - tupleCounter < BLAST_SIZE && total < DATABASE_SIZE; - tupleCounter++, total++) { - fsRolling.input.process("2014-12-1" + tupleCounter); - } - if (wid == 7) { - fsRolling.committed(wid - 1); - outputOperator.processTuple(mapping1); - outputOperator.processTuple(mapping2); - } - - fsRolling.endWindow(); - - if (wid == 9) { - Kryo kryo = new Kryo(); - FieldSerializer<HiveOperator> f1 = (FieldSerializer<HiveOperator>)kryo.getSerializer(HiveOperator.class); - FieldSerializer<HiveStore> f2 = (FieldSerializer<HiveStore>)kryo.getSerializer(HiveStore.class); - - f1.setCopyTransient(false); - f2.setCopyTransient(false); - newOp = kryo.copy(outputOperator); - outputOperator.teardown(); - newOp.setup(context); - - newOp.beginWindow(7); - newOp.processTuple(mapping3); - - newOp.endWindow(); - newOp.teardown(); - break; - } - - } - - hiveStore.connect(); - - client.execute("select * from " + tablename + " where dt='2014-12-10'"); - List<String> recordsInDatePartition1 = client.fetchAll(); - - client.execute("select * from " + tablename + " where dt='2014-12-11'"); - List<String> recordsInDatePartition2 = client.fetchAll(); - - client.execute("select * from " + tablename + " where dt='2014-12-12'"); - List<String> recordsInDatePartition3 = client.fetchAll(); - - client.execute("drop table " + tablename); - hiveStore.disconnect(); - - Assert.assertEquals(7, recordsInDatePartition1.size()); - for (int i = 0; i < recordsInDatePartition1.size(); i++) { - LOG.debug("records in first date partition are {}", recordsInDatePartition1.get(i)); - /*An array containing partition and data is returned as a string record, hence we need to upcast it to an object first - and then downcast to a string in order to use in Assert.*/ - Object record = recordsInDatePartition1.get(i); - Object[] records = (Object[])record; - Assert.assertEquals("2014-12-10", records[1]); - } - Assert.assertEquals(7, recordsInDatePartition2.size()); - for (int i = 0; i < recordsInDatePartition2.size(); i++) { - LOG.debug("records in second date partition are {}", recordsInDatePartition2.get(i)); - Object record = recordsInDatePartition2.get(i); - Object[] records = (Object[])record; - Assert.assertEquals("2014-12-11", records[1]); - } - Assert.assertEquals(10, recordsInDatePartition3.size()); - for (int i = 0; i < recordsInDatePartition3.size(); i++) { - LOG.debug("records in second date partition are {}", recordsInDatePartition3.get(i)); - Object record = recordsInDatePartition3.get(i); - Object[] records = (Object[])record; - Assert.assertEquals("2014-12-12", records[1]); - } - - } - - public class InnerObj - { - public InnerObj() - { - } - - private int id; - private String date; - - public String getDate() - { - return date; - } - - public void setDate(String date) - { - this.date = date; - } - - public int getId() - { - return id; - } - - public void setId(int id) - { - this.id = id; - } - - } - - private static final Logger LOG = LoggerFactory.getLogger(HiveMockTest.class); - -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/contrib/src/test/java/com/datatorrent/contrib/hive/HiveStoreTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/hive/HiveStoreTest.java b/contrib/src/test/java/com/datatorrent/contrib/hive/HiveStoreTest.java deleted file mode 100755 index fc7de9c..0000000 --- a/contrib/src/test/java/com/datatorrent/contrib/hive/HiveStoreTest.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.contrib.hive; - -import org.junit.Assert; -import org.junit.Test; - -public class HiveStoreTest -{ - @Test - public void defaultDriverTest() - { - HiveStore hiveStore = new HiveStore(); - Assert.assertEquals("Test default driver", HiveStore.HIVE_DRIVER,hiveStore.getDatabaseDriver()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/XmlJavadocCommentsExtractor.xsl ---------------------------------------------------------------------- diff --git a/hive/XmlJavadocCommentsExtractor.xsl b/hive/XmlJavadocCommentsExtractor.xsl new file mode 100644 index 0000000..ec72325 --- /dev/null +++ b/hive/XmlJavadocCommentsExtractor.xsl @@ -0,0 +1,48 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<!-- + Document : XmlJavadocCommentsExtractor.xsl + Created on : September 16, 2014, 11:30 AM + Description: + The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet. +--> + +<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0"> + <xsl:output method="xml" standalone="yes"/> + + <!-- copy xml by selecting only the following nodes, attrbutes and text --> + <xsl:template match="node()|text()|@*"> + <xsl:copy> + <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/> + </xsl:copy> + </xsl:template> + + <!-- Strip off the following paths from the selected xml --> + <xsl:template match="//root/package/interface/interface + |//root/package/interface/method/@qualified + |//root/package/class/interface + |//root/package/class/class + |//root/package/class/method/@qualified + |//root/package/class/field/@qualified" /> + + <xsl:strip-space elements="*"/> +</xsl:stylesheet> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bb0a93d0/hive/pom.xml ---------------------------------------------------------------------- diff --git a/hive/pom.xml b/hive/pom.xml new file mode 100755 index 0000000..25631f4 --- /dev/null +++ b/hive/pom.xml @@ -0,0 +1,262 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar</artifactId> + <version>3.5.0-SNAPSHOT</version> + </parent> + + <artifactId>malhar-hive</artifactId> + <name>Apache Apex Malhar Hive Support</name> + <packaging>jar</packaging> + + <properties> + <checkstyle.console>true</checkstyle.console> + </properties> + + <build> + <plugins> + <!-- Publish tests jar --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.4</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + <phase>package</phase> + </execution> + </executions> + </plugin> + <!-- create resource directory for xml javadoc--> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <id>createJavadocDirectory</id> + <phase>generate-resources</phase> + <configuration> + <tasks> + <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/> + <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/> + </tasks> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + <!-- generate javdoc --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <executions> + <!-- generate xml javadoc --> + <execution> + <id>xml-doclet</id> + <phase>generate-resources</phase> + <goals> + <goal>javadoc</goal> + </goals> + <configuration> + <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet> + <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam> + <useStandardDocletOptions>false</useStandardDocletOptions> + <docletArtifact> + <groupId>com.github.markusbernhardt</groupId> + <artifactId>xml-doclet</artifactId> + <version>1.0.4</version> + </docletArtifact> + </configuration> + </execution> + <!-- generate default javadoc jar with custom tags --> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <skip>true</skip> + <tags> + <tag> + <name>customTag1</name> + <placement>a</placement> + <head>Custom Tag One:</head> + </tag> + <tag> + <name>customTag2</name> + <placement>a</placement> + <head>Custom Tag two:</head> + </tag> + <tag> + <name>customTag3</name> + <placement>a</placement> + <head>Custom Tag three:</head> + </tag> + </tags> + </configuration> + </execution> + </executions> + </plugin> + <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags--> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>xml-maven-plugin</artifactId> + <version>1.0</version> + <executions> + <execution> + <id>transform-xmljavadoc</id> + <phase>generate-resources</phase> + <goals> + <goal>transform</goal> + </goals> + </execution> + </executions> + <configuration> + <transformationSets> + <transformationSet> + <dir>${project.build.directory}/generated-resources/xml-javadoc</dir> + <includes> + <include>${project.artifactId}-${project.version}-javadoc.xml</include> + </includes> + <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet> + <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir> + </transformationSet> + </transformationSets> + </configuration> + </plugin> + <!-- copy xml javadoc to class jar --> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>2.6</version> + <executions> + <execution> + <id>copy-resources</id> + <phase>process-resources</phase> + <goals> + <goal>copy-resources</goal> + </goals> + <configuration> + <outputDirectory>${basedir}/target/classes</outputDirectory> + <resources> + <resource> + <directory>${project.build.directory}/generated-resources/xml-javadoc</directory> + <includes> + <include>${project.artifactId}-${project.version}-javadoc.xml</include> + </includes> + <filtering>true</filtering> + </resource> + </resources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <configuration> + <maxAllowedViolations>0</maxAllowedViolations> + <logViolationsToConsole>${checkstyle.console}</logViolationsToConsole> + </configuration> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <argLine>-Xmx2048m</argLine> + </configuration> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>0.13.1</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-service</artifactId> + <version>0.13.1</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-jdbc</artifactId> + <version>0.13.1</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.codehaus.janino</groupId> + <artifactId>janino</artifactId> + <version>2.7.8</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.teknek</groupId> + <artifactId>hiveunit</artifactId> + <version>0.0.3</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>malhar-library</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>malhar-library</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <classifier>tests</classifier> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>apex-common</artifactId> + <version>${apex.core.version}</version> + <type>jar</type> + </dependency> + </dependencies> +</project>
