Repository: apex-malhar Updated Branches: refs/heads/master a37869e9a -> 7862fb3ff
APEXMALHAR-2516 contrib dependency issues Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/7862fb3f Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/7862fb3f Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/7862fb3f Branch: refs/heads/master Commit: 7862fb3fff86f3672824db0589c64d9fbb631d0e Parents: a37869e Author: Thomas Weise <t...@apache.org> Authored: Wed Jul 5 14:51:09 2017 -0700 Committer: Thomas Weise <t...@apache.org> Committed: Fri Jul 7 06:29:37 2017 -0700 ---------------------------------------------------------------------- contrib/pom.xml | 12 +++------ .../datatorrent/contrib/avro/PojoToAvro.java | 8 +++--- .../parquet/AbstractParquetFileReader.java | 15 +++++------ .../contrib/parquet/ParquetFilePOJOReader.java | 13 +++++----- .../contrib/avro/PojoToAvroTest.java | 8 +++--- .../parquet/ParquetFilePOJOReaderTest.java | 26 ++++++++++---------- .../JdbcIngest/JdbcPollerApplication.java | 3 ++- .../lib/db/jdbc/JdbcPOJOPollInputOperator.java | 1 - pom.xml | 1 + .../malhar/sql/codegen/BeanClassGenerator.java | 1 - .../malhar/sql/schema/TupleSchemaRegistry.java | 2 -- 11 files changed, 42 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7862fb3f/contrib/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/pom.xml b/contrib/pom.xml index 83305cb..3c5797c 100755 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -554,16 +554,10 @@ </exclusions> </dependency> <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-exec</artifactId> - <version>0.13.1</version> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop</artifactId> + <version>1.8.1</version> <optional>true</optional> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>org.apache.solr</groupId> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7862fb3f/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java b/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java index 2f8fb19..41c56e3 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java +++ b/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java @@ -86,9 +86,9 @@ public class PojoToAvro extends BaseOperator @VisibleForTesting int fieldErrorCount = 0; - public final transient DefaultOutputPort<GenericRecord> output = new DefaultOutputPort<GenericRecord>(); + public final transient DefaultOutputPort<GenericRecord> output = new DefaultOutputPort<>(); - public final transient DefaultOutputPort<Object> errorPort = new DefaultOutputPort<Object>(); + public final transient DefaultOutputPort<Object> errorPort = new DefaultOutputPort<>(); private void parseSchema() throws IOException { @@ -172,7 +172,7 @@ public class PojoToAvro extends BaseOperator { setColumnNames(schema.getFields()); - keyMethodMap = new ArrayList<Getter>(); + keyMethodMap = new ArrayList<>(); for (int i = 0; i < getColumnNames().size(); i++) { try { keyMethodMap.add(generateGettersForField(cls, getColumnNames().get(i).name())); @@ -217,7 +217,7 @@ public class PojoToAvro extends BaseOperator try { record = getGenericRecord(tuple); } catch (Exception e) { - LOG.error("Exception in parsing record"); + LOG.error("Exception in creating record"); errorCount++; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7862fb3f/contrib/src/main/java/com/datatorrent/contrib/parquet/AbstractParquetFileReader.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/parquet/AbstractParquetFileReader.java b/contrib/src/main/java/com/datatorrent/contrib/parquet/AbstractParquetFileReader.java index c4eefff..14ab918 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/parquet/AbstractParquetFileReader.java +++ b/contrib/src/main/java/com/datatorrent/contrib/parquet/AbstractParquetFileReader.java @@ -21,17 +21,17 @@ package com.datatorrent.contrib.parquet; import java.io.IOException; import java.io.InputStream; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.lib.io.fs.AbstractFileInputOperator; -import parquet.example.data.Group; -import parquet.hadoop.ParquetReader; -import parquet.hadoop.example.GroupReadSupport; -import parquet.schema.MessageType; -import parquet.schema.MessageTypeParser; - /** * Base implementation of ParquetFileReader. Reads Parquet files from input * directory using GroupReadSupport. Derived classes need to implement @@ -41,6 +41,7 @@ import parquet.schema.MessageTypeParser; * * @since 3.4.0 */ +@InterfaceStability.Evolving public abstract class AbstractParquetFileReader<T> extends AbstractFileInputOperator<T> { private transient ParquetReader<Group> reader; @@ -70,7 +71,7 @@ public abstract class AbstractParquetFileReader<T> extends AbstractFileInputOper InputStream is = super.openFile(path); GroupReadSupport readSupport = new GroupReadSupport(); readSupport.init(configuration, null, schema); - reader = new ParquetReader<Group>(path, readSupport); + reader = new ParquetReader<>(path, readSupport); return is; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7862fb3f/contrib/src/main/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReader.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReader.java b/contrib/src/main/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReader.java index 37bd60b..36c5b55 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReader.java +++ b/contrib/src/main/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReader.java @@ -25,6 +25,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.lang3.ClassUtils; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.io.InvalidRecordException; +import org.apache.parquet.io.ParquetEncodingException; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import com.datatorrent.api.Context; import com.datatorrent.api.DefaultOutputPort; @@ -32,11 +37,6 @@ import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.lib.util.FieldInfo.SupportType; import com.datatorrent.lib.util.PojoUtils; -import parquet.example.data.Group; -import parquet.io.InvalidRecordException; -import parquet.io.ParquetEncodingException; -import parquet.schema.PrimitiveType.PrimitiveTypeName; - /** * <p> * ParquetFilePOJOReader @@ -57,6 +57,7 @@ import parquet.schema.PrimitiveType.PrimitiveTypeName; * * @since 3.4.0 */ +@InterfaceStability.Evolving public class ParquetFilePOJOReader extends AbstractParquetFileReader<Object> { @@ -175,7 +176,7 @@ public class ParquetFilePOJOReader extends AbstractParquetFileReader<Object> private void initialiseActiveFieldInfo(String fieldMapping) { String[] fields = fieldMapping.split(RECORD_SEPARATOR); - activeFieldInfos = new ArrayList<ActiveFieldInfo>(fields.length); + activeFieldInfos = new ArrayList<>(fields.length); for (String field : fields) { String[] token = field.split(FIELD_SEPARATOR); try { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7862fb3f/contrib/src/test/java/com/datatorrent/contrib/avro/PojoToAvroTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/avro/PojoToAvroTest.java b/contrib/src/test/java/com/datatorrent/contrib/avro/PojoToAvroTest.java index 772a057..787e684 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/avro/PojoToAvroTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/avro/PojoToAvroTest.java @@ -18,6 +18,7 @@ */ package com.datatorrent.contrib.avro; +import java.util.ArrayList; import java.util.List; import java.util.ListIterator; @@ -26,7 +27,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestWatcher; import org.junit.runner.Description; -import org.python.google.common.collect.Lists; import com.datatorrent.api.Attribute; import com.datatorrent.api.Context; @@ -41,7 +41,7 @@ public class PojoToAvroTest + "\"type\": \"long\"}," + "{\"name\":\"customerId\",\"type\": \"int\"}," + "{\"name\":\"total\",\"type\": \"double\"}," + "{\"name\":\"customerName\",\"type\": \"string\"}]}"; - CollectorTestSink<Object> outputSink = new CollectorTestSink<Object>(); + CollectorTestSink<Object> outputSink = new CollectorTestSink<>(); PojoToAvro avroWriter = new PojoToAvro(); public class TestMeta extends TestWatcher @@ -74,7 +74,7 @@ public class PojoToAvroTest public void testWriting() throws Exception { - List<SimpleOrder> orderList = Lists.newArrayList(); + List<SimpleOrder> orderList = new ArrayList<>(); orderList.add(new SimpleOrder(1, 11, 100.25, "customerOne")); orderList.add(new SimpleOrder(2, 22, 200.25, "customerTwo")); orderList.add(new SimpleOrder(3, 33, 300.25, "customerThree")); @@ -101,7 +101,7 @@ public class PojoToAvroTest public void testWriteFailure() throws Exception { - List<Order> orderList = Lists.newArrayList(); + List<Order> orderList = new ArrayList<>(); orderList.add(new Order(11)); orderList.add(new Order(22)); orderList.add(new Order(33)); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7862fb3f/contrib/src/test/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReaderTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReaderTest.java b/contrib/src/test/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReaderTest.java index 89a9839..862c96a 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReaderTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReaderTest.java @@ -55,15 +55,15 @@ import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.lib.util.PojoUtils; import com.datatorrent.lib.util.PojoUtils.Getter; -import parquet.column.ColumnDescriptor; -import parquet.hadoop.ParquetWriter; -import parquet.hadoop.api.WriteSupport; -import parquet.hadoop.metadata.CompressionCodecName; -import parquet.io.ParquetEncodingException; -import parquet.io.api.Binary; -import parquet.io.api.RecordConsumer; -import parquet.schema.MessageType; -import parquet.schema.MessageTypeParser; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.ParquetEncodingException; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext; @@ -74,7 +74,7 @@ public class ParquetFilePOJOReaderTest + "required INT32 event_id;" + "required BINARY org_id (UTF8);" + "required INT64 long_id;" + "optional BOOLEAN css_file_loaded;" + "optional FLOAT float_val;" + "optional DOUBLE double_val;}"; - CollectorTestSink<Object> outputSink = new CollectorTestSink<Object>(); + CollectorTestSink<Object> outputSink = new CollectorTestSink<>(); ParquetFilePOJOReader parquetFilePOJOReader = new ParquetFilePOJOReader(); public static class TestMeta extends TestWatcher @@ -506,7 +506,7 @@ public class ParquetFilePOJOReaderTest public ParquetPOJOWriter(Path file, MessageType schema, Class klass, CompressionCodecName codecName, boolean enableDictionary) throws IOException { - super(file, (WriteSupport<Object>)new POJOWriteSupport(schema, klass), codecName, DEFAULT_BLOCK_SIZE, + super(file, new POJOWriteSupport(schema, klass), codecName, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, enableDictionary, false); } @@ -530,7 +530,7 @@ public class ParquetFilePOJOReaderTest private void init() { - keyMethodMap = new ArrayList<Getter>(); + keyMethodMap = new ArrayList<>(); for (int i = 0; i < cols.size(); i++) { try { keyMethodMap.add(generateGettersForField(klass, cols.get(i).getPath()[0])); @@ -542,7 +542,7 @@ public class ParquetFilePOJOReaderTest } @Override - public parquet.hadoop.api.WriteSupport.WriteContext init(Configuration configuration) + public WriteContext init(Configuration configuration) { return new WriteContext(schema, new HashMap<String, String>()); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7862fb3f/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplication.java ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplication.java b/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplication.java index 4f351ef..2a5ff29 100644 --- a/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplication.java +++ b/examples/jdbc/src/main/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplication.java @@ -36,6 +36,7 @@ import com.datatorrent.lib.util.FieldInfo.SupportType; @ApplicationAnnotation(name = "PollJdbcToHDFSApp") public class JdbcPollerApplication implements StreamingApplication { + @Override public void populateDAG(DAG dag, Configuration conf) { JdbcPOJOPollInputOperator poller = dag.addOperator("JdbcPoller", new JdbcPOJOPollInputOperator()); @@ -53,7 +54,7 @@ public class JdbcPollerApplication implements StreamingApplication } /** - * This method can be modified to have field mappings based on used defined + * This method can be modified to have field mappings based on user defined * class */ private List<FieldInfo> addFieldInfos() http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7862fb3f/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java index f96c6e1..8442d55 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java @@ -84,7 +84,6 @@ public class JdbcPOJOPollInputOperator extends AbstractJdbcPollInputOperator<Obj { super.setup(context); try { - // closing the query statement in super class as it is not needed if (getColumnsExpression() == null) { StringBuilder columns = new StringBuilder(); for (int i = 0; i < fieldInfos.size(); i++) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7862fb3f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index dc9ed8b..d6bdfdd 100644 --- a/pom.xml +++ b/pom.xml @@ -134,6 +134,7 @@ <excludes> <exclude>@org.apache.hadoop.classification.InterfaceStability$Evolving</exclude> <exclude>@org.apache.hadoop.classification.InterfaceStability$Unstable</exclude> + <exclude>com.datatorrent.contrib.parquet</exclude> </excludes> </parameter> <skip>${semver.plugin.skip}</skip> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7862fb3f/sql/src/main/java/org/apache/apex/malhar/sql/codegen/BeanClassGenerator.java ---------------------------------------------------------------------- diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/codegen/BeanClassGenerator.java b/sql/src/main/java/org/apache/apex/malhar/sql/codegen/BeanClassGenerator.java index 5365e20..cc2eff8 100644 --- a/sql/src/main/java/org/apache/apex/malhar/sql/codegen/BeanClassGenerator.java +++ b/sql/src/main/java/org/apache/apex/malhar/sql/codegen/BeanClassGenerator.java @@ -104,7 +104,6 @@ public class BeanClassGenerator * @throws JSONException * @throws IOException */ - @SuppressWarnings("unchecked") public static byte[] createAndWriteBeanClass(String fqcn, List<TupleSchemaRegistry.SQLFieldInfo> fieldList, FSDataOutputStream outputStream) throws JSONException, IOException { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7862fb3f/sql/src/main/java/org/apache/apex/malhar/sql/schema/TupleSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/sql/src/main/java/org/apache/apex/malhar/sql/schema/TupleSchemaRegistry.java b/sql/src/main/java/org/apache/apex/malhar/sql/schema/TupleSchemaRegistry.java index 460fafb..7742503 100644 --- a/sql/src/main/java/org/apache/apex/malhar/sql/schema/TupleSchemaRegistry.java +++ b/sql/src/main/java/org/apache/apex/malhar/sql/schema/TupleSchemaRegistry.java @@ -129,9 +129,7 @@ public class TupleSchemaRegistry case DOUBLE: return Double.class; case DATE: - return Date.class; case TIME: - return Date.class; case TIMESTAMP: return Date.class; case CHAR: