[FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.
This closes #2897. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de4fe3b7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de4fe3b7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de4fe3b7 Branch: refs/heads/master Commit: de4fe3b7392948807753d65d13f3da968e6c7de0 Parents: cc006ff Author: Fabian Hueske <[email protected]> Authored: Tue Nov 29 13:57:30 2016 +0100 Committer: Fabian Hueske <[email protected]> Committed: Fri Dec 2 14:28:35 2016 +0100 ---------------------------------------------------------------------- .gitignore | 2 +- flink-batch-connectors/flink-avro/pom.xml | 216 -- .../apache/flink/api/avro/DataInputDecoder.java | 213 -- .../flink/api/avro/DataOutputEncoder.java | 183 -- .../api/avro/FSDataInputStreamWrapper.java | 68 - .../flink/api/java/io/AvroInputFormat.java | 207 -- .../flink/api/java/io/AvroOutputFormat.java | 189 -- .../src/test/assembly/test-assembly.xml | 36 - .../api/avro/AvroExternalJarProgramITCase.java | 80 - .../flink/api/avro/AvroOutputFormatITCase.java | 176 -- .../flink/api/avro/EncoderDecoderTest.java | 528 ----- .../avro/testjar/AvroExternalJarProgram.java | 219 -- .../apache/flink/api/io/avro/AvroPojoTest.java | 255 --- .../api/io/avro/AvroRecordInputFormatTest.java | 458 ---- .../io/avro/AvroSplittableInputFormatTest.java | 326 --- .../api/io/avro/example/AvroTypeExample.java | 108 - .../apache/flink/api/io/avro/example/User.java | 269 --- .../io/AvroInputFormatTypeExtractionTest.java | 81 - .../flink/api/java/io/AvroOutputFormatTest.java | 154 -- .../src/test/resources/avro/user.avsc | 35 - .../src/test/resources/log4j-test.properties | 27 - .../src/test/resources/logback-test.xml | 29 - .../flink-avro/src/test/resources/testdata.avro | Bin 4572 -> 0 bytes .../flink-hadoop-compatibility/pom.xml | 182 -- .../api/java/typeutils/WritableTypeInfo.java | 154 -- .../typeutils/runtime/WritableComparator.java | 188 -- .../typeutils/runtime/WritableSerializer.java | 152 -- .../flink/hadoopcompatibility/HadoopInputs.java | 118 -- .../flink/hadoopcompatibility/HadoopUtils.java | 52 - .../mapred/HadoopMapFunction.java | 133 -- .../mapred/HadoopReduceCombineFunction.java | 168 -- .../mapred/HadoopReduceFunction.java | 142 -- .../mapred/wrapper/HadoopOutputCollector.java | 59 - .../wrapper/HadoopTupleUnwrappingIterator.java | 94 - .../scala/HadoopInputs.scala | 143 -- .../java/typeutils/WritableExtractionTest.java | 206 -- .../java/typeutils/WritableInfoParserTest.java | 84 - .../java/typeutils/WritableTypeInfoTest.java | 72 - .../typeutils/runtime/StringArrayWritable.java | 83 - .../runtime/WritableComparatorTest.java | 53 - .../runtime/WritableComparatorUUIDTest.java | 46 - .../api/java/typeutils/runtime/WritableID.java | 78 - .../runtime/WritableSerializerTest.java | 50 - .../runtime/WritableSerializerUUIDTest.java | 50 - .../hadoopcompatibility/HadoopUtilsTest.java | 34 - .../mapred/HadoopMapFunctionITCase.java | 182 -- .../mapred/HadoopMapredITCase.java | 47 - .../HadoopReduceCombineFunctionITCase.java | 265 --- .../mapred/HadoopReduceFunctionITCase.java | 213 -- .../mapred/HadoopTestData.java | 62 - .../example/HadoopMapredCompatWordCount.java | 133 -- .../HadoopTupleUnwrappingIteratorTest.java | 139 -- .../mapreduce/HadoopInputOutputITCase.java | 47 - .../mapreduce/example/WordCount.java | 119 -- .../src/test/resources/log4j-test.properties | 27 - .../src/test/resources/logback-test.xml | 29 - flink-batch-connectors/flink-hbase/pom.xml | 264 --- .../flink/addons/hbase/TableInputFormat.java | 289 --- .../flink/addons/hbase/TableInputSplit.java | 89 - .../hbase/HBaseTestingClusterAutostarter.java | 238 --- .../addons/hbase/TableInputFormatITCase.java | 120 -- .../hbase/example/HBaseFlinkTestConstants.java | 28 - .../addons/hbase/example/HBaseReadExample.java | 92 - .../addons/hbase/example/HBaseWriteExample.java | 202 -- .../hbase/example/HBaseWriteStreamExample.java | 113 - .../src/test/resources/log4j-test.properties | 23 - flink-batch-connectors/flink-hcatalog/pom.xml | 182 -- .../flink/hcatalog/HCatInputFormatBase.java | 410 ---- .../flink/hcatalog/java/HCatInputFormat.java | 160 -- .../flink/hcatalog/scala/HCatInputFormat.scala | 229 -- flink-batch-connectors/flink-jdbc/pom.xml | 66 - .../flink/api/java/io/jdbc/JDBCInputFormat.java | 404 ---- .../api/java/io/jdbc/JDBCOutputFormat.java | 315 --- .../split/GenericParameterValuesProvider.java | 44 - .../split/NumericBetweenParametersProvider.java | 66 - .../io/jdbc/split/ParameterValuesProvider.java | 35 - .../flink/api/java/io/jdbc/JDBCFullTest.java | 101 - .../api/java/io/jdbc/JDBCInputFormatTest.java | 247 --- .../api/java/io/jdbc/JDBCOutputFormatTest.java | 169 -- .../flink/api/java/io/jdbc/JDBCTestBase.java | 183 -- .../src/test/resources/log4j-test.properties | 19 - .../src/test/resources/logback-test.xml | 29 - flink-batch-connectors/pom.xml | 45 - flink-connectors/flink-avro/pom.xml | 216 ++ .../apache/flink/api/avro/DataInputDecoder.java | 213 ++ .../flink/api/avro/DataOutputEncoder.java | 183 ++ .../api/avro/FSDataInputStreamWrapper.java | 68 + .../flink/api/java/io/AvroInputFormat.java | 207 ++ .../flink/api/java/io/AvroOutputFormat.java | 189 ++ .../src/test/assembly/test-assembly.xml | 36 + .../api/avro/AvroExternalJarProgramITCase.java | 80 + .../flink/api/avro/AvroOutputFormatITCase.java | 176 ++ .../flink/api/avro/EncoderDecoderTest.java | 528 +++++ .../avro/testjar/AvroExternalJarProgram.java | 219 ++ .../apache/flink/api/io/avro/AvroPojoTest.java | 255 +++ .../api/io/avro/AvroRecordInputFormatTest.java | 458 ++++ .../io/avro/AvroSplittableInputFormatTest.java | 326 +++ .../api/io/avro/example/AvroTypeExample.java | 108 + .../apache/flink/api/io/avro/example/User.java | 269 +++ .../io/AvroInputFormatTypeExtractionTest.java | 81 + .../flink/api/java/io/AvroOutputFormatTest.java | 154 ++ .../src/test/resources/avro/user.avsc | 35 + .../src/test/resources/log4j-test.properties | 27 + .../src/test/resources/logback-test.xml | 29 + .../flink-avro/src/test/resources/testdata.avro | Bin 0 -> 4572 bytes .../flink-connector-cassandra/pom.xml | 179 ++ .../cassandra/CassandraInputFormat.java | 131 ++ .../cassandra/CassandraOutputFormat.java | 125 ++ .../cassandra/CassandraCommitter.java | 151 ++ .../connectors/cassandra/CassandraPojoSink.java | 67 + .../connectors/cassandra/CassandraSink.java | 329 +++ .../connectors/cassandra/CassandraSinkBase.java | 98 + .../cassandra/CassandraTupleSink.java | 59 + .../cassandra/CassandraTupleWriteAheadSink.java | 159 ++ .../connectors/cassandra/ClusterBuilder.java | 43 + .../cassandra/example/BatchExample.java | 77 + .../cassandra/CassandraConnectorITCase.java | 440 ++++ .../CassandraTupleWriteAheadSinkTest.java | 127 ++ .../streaming/connectors/cassandra/Pojo.java | 65 + .../example/CassandraPojoSinkExample.java | 62 + .../example/CassandraTupleSinkExample.java | 62 + .../CassandraTupleWriteAheadSinkExample.java | 96 + .../connectors/cassandra/example/Message.java | 56 + .../src/test/resources/cassandra.yaml | 43 + .../src/test/resources/log4j-test.properties | 29 + .../flink-connector-elasticsearch/pom.xml | 90 + .../elasticsearch/ElasticsearchSink.java | 315 +++ .../elasticsearch/IndexRequestBuilder.java | 66 + .../elasticsearch/ElasticsearchSinkITCase.java | 205 ++ .../examples/ElasticsearchExample.java | 80 + .../src/test/resources/log4j-test.properties | 27 + .../src/test/resources/logback-test.xml | 30 + .../flink-connector-elasticsearch2/pom.xml | 83 + .../elasticsearch2/BulkProcessorIndexer.java | 35 + .../elasticsearch2/ElasticsearchSink.java | 257 +++ .../ElasticsearchSinkFunction.java | 60 + .../elasticsearch2/RequestIndexer.java | 25 + .../elasticsearch2/ElasticsearchSinkITCase.java | 233 ++ .../examples/ElasticsearchExample.java | 90 + .../src/test/resources/log4j-test.properties | 27 + .../src/test/resources/logback-test.xml | 30 + .../flink-connector-filesystem/pom.xml | 163 ++ .../connectors/fs/AvroKeyValueSinkWriter.java | 309 +++ .../flink/streaming/connectors/fs/Bucketer.java | 55 + .../flink/streaming/connectors/fs/Clock.java | 33 + .../connectors/fs/DateTimeBucketer.java | 126 ++ .../connectors/fs/NonRollingBucketer.java | 47 + .../streaming/connectors/fs/RollingSink.java | 916 ++++++++ .../connectors/fs/SequenceFileWriter.java | 151 ++ .../connectors/fs/StreamWriterBase.java | 152 ++ .../streaming/connectors/fs/StringWriter.java | 86 + .../streaming/connectors/fs/SystemClock.java | 29 + .../flink/streaming/connectors/fs/Writer.java | 73 + .../fs/bucketing/BasePathBucketer.java | 39 + .../connectors/fs/bucketing/Bucketer.java | 47 + .../connectors/fs/bucketing/BucketingSink.java | 1082 ++++++++++ .../fs/bucketing/DateTimeBucketer.java | 102 + .../src/main/resources/log4j.properties | 27 + .../fs/RollingSinkFaultToleranceITCase.java | 300 +++ .../connectors/fs/RollingSinkITCase.java | 991 +++++++++ .../connectors/fs/RollingSinkSecuredITCase.java | 252 +++ .../BucketingSinkFaultToleranceITCase.java | 297 +++ .../fs/bucketing/BucketingSinkTest.java | 867 ++++++++ .../src/test/resources/log4j-test.properties | 29 + .../src/test/resources/logback-test.xml | 30 + flink-connectors/flink-connector-flume/pom.xml | 175 ++ .../streaming/connectors/flume/FlumeSink.java | 141 ++ .../flink-connector-kafka-0.10/pom.xml | 205 ++ .../connectors/kafka/FlinkKafkaConsumer010.java | 153 ++ .../connectors/kafka/FlinkKafkaProducer010.java | 398 ++++ .../kafka/Kafka010JsonTableSource.java | 71 + .../connectors/kafka/Kafka010TableSource.java | 75 + .../kafka/internal/Kafka010Fetcher.java | 104 + .../internal/KafkaConsumerCallBridge010.java | 40 + .../src/main/resources/log4j.properties | 29 + .../connectors/kafka/Kafka010FetcherTest.java | 484 +++++ .../connectors/kafka/Kafka010ITCase.java | 313 +++ .../kafka/Kafka010ProducerITCase.java | 33 + .../kafka/KafkaTestEnvironmentImpl.java | 420 ++++ .../src/test/resources/log4j-test.properties | 30 + .../src/test/resources/logback-test.xml | 30 + .../flink-connector-kafka-0.8/pom.xml | 219 ++ .../connectors/kafka/FlinkKafkaConsumer08.java | 398 ++++ .../connectors/kafka/FlinkKafkaConsumer081.java | 39 + .../connectors/kafka/FlinkKafkaConsumer082.java | 39 + .../connectors/kafka/FlinkKafkaProducer.java | 64 + .../connectors/kafka/FlinkKafkaProducer08.java | 145 ++ .../connectors/kafka/Kafka08JsonTableSink.java | 52 + .../kafka/Kafka08JsonTableSource.java | 71 + .../connectors/kafka/Kafka08TableSource.java | 75 + .../kafka/internals/ClosableBlockingQueue.java | 507 +++++ .../kafka/internals/Kafka08Fetcher.java | 481 +++++ .../kafka/internals/KillerWatchDog.java | 62 + .../kafka/internals/PartitionInfoFetcher.java | 66 + .../internals/PeriodicOffsetCommitter.java | 85 + .../kafka/internals/SimpleConsumerThread.java | 504 +++++ .../kafka/internals/ZookeeperOffsetHandler.java | 164 ++ .../connectors/kafka/Kafka08ITCase.java | 248 +++ .../kafka/Kafka08JsonTableSinkTest.java | 48 + .../kafka/Kafka08JsonTableSourceTest.java | 45 + .../connectors/kafka/Kafka08ProducerITCase.java | 32 + .../connectors/kafka/KafkaConsumer08Test.java | 139 ++ .../connectors/kafka/KafkaLocalSystemTime.java | 48 + .../connectors/kafka/KafkaProducerTest.java | 123 ++ .../kafka/KafkaShortRetention08ITCase.java | 34 + .../kafka/KafkaTestEnvironmentImpl.java | 401 ++++ .../internals/ClosableBlockingQueueTest.java | 603 ++++++ .../src/test/resources/log4j-test.properties | 30 + .../src/test/resources/logback-test.xml | 30 + .../flink-connector-kafka-0.9/pom.xml | 212 ++ .../connectors/kafka/FlinkKafkaConsumer09.java | 269 +++ .../connectors/kafka/FlinkKafkaProducer09.java | 137 ++ .../connectors/kafka/Kafka09JsonTableSink.java | 50 + .../kafka/Kafka09JsonTableSource.java | 71 + .../connectors/kafka/Kafka09TableSource.java | 75 + .../connectors/kafka/internal/Handover.java | 214 ++ .../kafka/internal/Kafka09Fetcher.java | 241 +++ .../kafka/internal/KafkaConsumerCallBridge.java | 41 + .../kafka/internal/KafkaConsumerThread.java | 332 +++ .../src/main/resources/log4j.properties | 29 + .../connectors/kafka/Kafka09FetcherTest.java | 482 +++++ .../connectors/kafka/Kafka09ITCase.java | 129 ++ .../kafka/Kafka09JsonTableSinkTest.java | 48 + .../kafka/Kafka09JsonTableSourceTest.java | 45 + .../connectors/kafka/Kafka09ProducerITCase.java | 32 + .../kafka/Kafka09SecuredRunITCase.java | 62 + .../connectors/kafka/KafkaProducerTest.java | 126 ++ .../kafka/KafkaTestEnvironmentImpl.java | 439 ++++ .../connectors/kafka/internal/HandoverTest.java | 387 ++++ .../src/test/resources/log4j-test.properties | 32 + .../src/test/resources/logback-test.xml | 30 + .../flink-connector-kafka-base/pom.xml | 212 ++ .../kafka/FlinkKafkaConsumerBase.java | 552 +++++ .../kafka/FlinkKafkaProducerBase.java | 386 ++++ .../connectors/kafka/KafkaJsonTableSink.java | 47 + .../connectors/kafka/KafkaJsonTableSource.java | 97 + .../connectors/kafka/KafkaTableSink.java | 127 ++ .../connectors/kafka/KafkaTableSource.java | 155 ++ .../kafka/internals/AbstractFetcher.java | 552 +++++ .../kafka/internals/ExceptionProxy.java | 125 ++ .../kafka/internals/KafkaTopicPartition.java | 120 ++ .../internals/KafkaTopicPartitionLeader.java | 98 + .../internals/KafkaTopicPartitionState.java | 118 ++ ...picPartitionStateWithPeriodicWatermarks.java | 71 + ...cPartitionStateWithPunctuatedWatermarks.java | 84 + .../connectors/kafka/internals/TypeUtil.java | 38 + .../internals/metrics/KafkaMetricWrapper.java | 37 + .../kafka/partitioner/FixedPartitioner.java | 76 + .../kafka/partitioner/KafkaPartitioner.java | 41 + .../JSONDeserializationSchema.java | 46 + .../JSONKeyValueDeserializationSchema.java | 72 + .../JsonRowDeserializationSchema.java | 135 ++ .../JsonRowSerializationSchema.java | 70 + .../KeyedDeserializationSchema.java | 52 + .../KeyedDeserializationSchemaWrapper.java | 51 + .../serialization/KeyedSerializationSchema.java | 55 + .../KeyedSerializationSchemaWrapper.java | 48 + ...eInformationKeyValueSerializationSchema.java | 196 ++ .../kafka/FlinkKafkaConsumerBaseTest.java | 416 ++++ .../kafka/FlinkKafkaProducerBaseTest.java | 288 +++ .../kafka/JSONDeserializationSchemaTest.java | 41 + .../JSONKeyValueDeserializationSchemaTest.java | 68 + .../kafka/JsonRowDeserializationSchemaTest.java | 124 ++ .../kafka/JsonRowSerializationSchemaTest.java | 98 + .../KafkaConsumerPartitionAssignmentTest.java | 269 +++ .../connectors/kafka/KafkaConsumerTestBase.java | 2006 ++++++++++++++++++ .../connectors/kafka/KafkaProducerTestBase.java | 193 ++ .../kafka/KafkaShortRetentionTestBase.java | 291 +++ .../kafka/KafkaTableSinkTestBase.java | 106 + .../kafka/KafkaTableSourceTestBase.java | 77 + .../connectors/kafka/KafkaTestBase.java | 203 ++ .../connectors/kafka/KafkaTestEnvironment.java | 112 + .../connectors/kafka/TestFixedPartitioner.java | 104 + .../AbstractFetcherTimestampsTest.java | 320 +++ .../internals/KafkaTopicPartitionTest.java | 57 + .../kafka/testutils/DataGenerators.java | 227 ++ .../kafka/testutils/FailingIdentityMapper.java | 115 + .../testutils/FakeStandardProducerConfig.java | 34 + .../testutils/JobManagerCommunicationUtils.java | 120 ++ .../testutils/PartitionValidatingMapper.java | 53 + .../kafka/testutils/ThrottledMapper.java | 44 + .../kafka/testutils/Tuple2Partitioner.java | 48 + .../testutils/ValidatingExactlyOnceSink.java | 82 + .../testutils/ZooKeeperStringSerializer.java | 51 + .../src/test/resources/log4j-test.properties | 29 + .../src/test/resources/logback-test.xml | 30 + .../flink-connector-kinesis/pom.xml | 164 ++ .../kinesis/FlinkKinesisConsumer.java | 304 +++ .../kinesis/FlinkKinesisProducer.java | 292 +++ .../connectors/kinesis/KinesisPartitioner.java | 49 + .../kinesis/config/AWSConfigConstants.java | 70 + .../kinesis/config/ConsumerConfigConstants.java | 138 ++ .../kinesis/config/ProducerConfigConstants.java | 33 + .../kinesis/examples/ConsumeFromKinesis.java | 54 + .../kinesis/examples/ProduceIntoKinesis.java | 77 + .../kinesis/internals/KinesisDataFetcher.java | 679 ++++++ .../kinesis/internals/ShardConsumer.java | 287 +++ .../kinesis/model/KinesisStreamShard.java | 133 ++ .../kinesis/model/KinesisStreamShardState.java | 71 + .../kinesis/model/SentinelSequenceNumber.java | 51 + .../kinesis/model/SequenceNumber.java | 104 + .../kinesis/proxy/GetShardListResult.java | 75 + .../connectors/kinesis/proxy/KinesisProxy.java | 338 +++ .../kinesis/proxy/KinesisProxyInterface.java | 69 + .../KinesisDeserializationSchema.java | 57 + .../KinesisDeserializationSchemaWrapper.java | 57 + .../KinesisSerializationSchema.java | 45 + .../connectors/kinesis/util/AWSUtil.java | 130 ++ .../kinesis/util/KinesisConfigUtil.java | 218 ++ .../src/main/resources/log4j.properties | 27 + .../kinesis/FlinkKinesisConsumerTest.java | 472 +++++ .../internals/KinesisDataFetcherTest.java | 510 +++++ .../kinesis/internals/ShardConsumerTest.java | 122 ++ .../manualtests/ManualConsumerProducerTest.java | 121 ++ .../manualtests/ManualExactlyOnceTest.java | 147 ++ ...nualExactlyOnceWithStreamReshardingTest.java | 247 +++ .../kinesis/manualtests/ManualProducerTest.java | 91 + .../ExactlyOnceValidatingConsumerThread.java | 155 ++ .../testutils/FakeKinesisBehavioursFactory.java | 262 +++ .../KinesisEventsGeneratorProducerThread.java | 118 ++ .../testutils/KinesisShardIdGenerator.java | 25 + .../testutils/TestableFlinkKinesisConsumer.java | 60 + .../testutils/TestableKinesisDataFetcher.java | 122 ++ flink-connectors/flink-connector-nifi/pom.xml | 89 + .../connectors/nifi/NiFiDataPacket.java | 39 + .../connectors/nifi/NiFiDataPacketBuilder.java | 34 + .../streaming/connectors/nifi/NiFiSink.java | 74 + .../streaming/connectors/nifi/NiFiSource.java | 155 ++ .../connectors/nifi/StandardNiFiDataPacket.java | 46 + .../nifi/examples/NiFiSinkTopologyExample.java | 55 + .../examples/NiFiSourceTopologyExample.java | 58 + .../src/test/resources/NiFi_Flink.xml | 16 + .../flink-connector-rabbitmq/pom.xml | 60 + .../streaming/connectors/rabbitmq/RMQSink.java | 142 ++ .../connectors/rabbitmq/RMQSource.java | 243 +++ .../rabbitmq/common/RMQConnectionConfig.java | 448 ++++ .../connectors/rabbitmq/RMQSourceTest.java | 419 ++++ .../common/RMQConnectionConfigTest.java | 69 + .../connectors/rabbitmq/common/RMQSinkTest.java | 125 ++ flink-connectors/flink-connector-redis/pom.xml | 79 + .../streaming/connectors/redis/RedisSink.java | 188 ++ .../common/config/FlinkJedisClusterConfig.java | 187 ++ .../common/config/FlinkJedisConfigBase.java | 90 + .../common/config/FlinkJedisPoolConfig.java | 224 ++ .../common/config/FlinkJedisSentinelConfig.java | 259 +++ .../common/container/RedisClusterContainer.java | 171 ++ .../container/RedisCommandsContainer.java | 115 + .../RedisCommandsContainerBuilder.java | 116 + .../redis/common/container/RedisContainer.java | 252 +++ .../redis/common/mapper/RedisCommand.java | 86 + .../common/mapper/RedisCommandDescription.java | 94 + .../redis/common/mapper/RedisDataType.java | 66 + .../redis/common/mapper/RedisMapper.java | 66 + .../connectors/redis/RedisITCaseBase.java | 45 + .../redis/RedisSentinelClusterTest.java | 100 + .../connectors/redis/RedisSinkITCase.java | 233 ++ .../redis/RedisSinkPublishITCase.java | 137 ++ .../connectors/redis/RedisSinkTest.java | 144 ++ .../common/config/FlinkJedisConfigBaseTest.java | 50 + .../common/config/JedisClusterConfigTest.java | 49 + .../common/config/JedisPoolConfigTest.java | 29 + .../common/config/JedisSentinelConfigTest.java | 49 + .../mapper/RedisDataTypeDescriptionTest.java | 41 + .../flink-connector-twitter/pom.xml | 96 + .../connectors/twitter/TwitterSource.java | 217 ++ .../flink-hadoop-compatibility/pom.xml | 182 ++ .../api/java/typeutils/WritableTypeInfo.java | 154 ++ .../typeutils/runtime/WritableComparator.java | 188 ++ .../typeutils/runtime/WritableSerializer.java | 152 ++ .../flink/hadoopcompatibility/HadoopInputs.java | 118 ++ .../flink/hadoopcompatibility/HadoopUtils.java | 52 + .../mapred/HadoopMapFunction.java | 133 ++ .../mapred/HadoopReduceCombineFunction.java | 168 ++ .../mapred/HadoopReduceFunction.java | 142 ++ .../mapred/wrapper/HadoopOutputCollector.java | 59 + .../wrapper/HadoopTupleUnwrappingIterator.java | 94 + .../scala/HadoopInputs.scala | 143 ++ .../java/typeutils/WritableExtractionTest.java | 206 ++ .../java/typeutils/WritableInfoParserTest.java | 84 + .../java/typeutils/WritableTypeInfoTest.java | 72 + .../typeutils/runtime/StringArrayWritable.java | 83 + .../runtime/WritableComparatorTest.java | 53 + .../runtime/WritableComparatorUUIDTest.java | 46 + .../api/java/typeutils/runtime/WritableID.java | 78 + .../runtime/WritableSerializerTest.java | 50 + .../runtime/WritableSerializerUUIDTest.java | 50 + .../hadoopcompatibility/HadoopUtilsTest.java | 34 + .../mapred/HadoopMapFunctionITCase.java | 182 ++ .../mapred/HadoopMapredITCase.java | 47 + .../HadoopReduceCombineFunctionITCase.java | 265 +++ .../mapred/HadoopReduceFunctionITCase.java | 213 ++ .../mapred/HadoopTestData.java | 62 + .../example/HadoopMapredCompatWordCount.java | 133 ++ .../HadoopTupleUnwrappingIteratorTest.java | 139 ++ .../mapreduce/HadoopInputOutputITCase.java | 47 + .../mapreduce/example/WordCount.java | 119 ++ .../src/test/resources/log4j-test.properties | 27 + .../src/test/resources/logback-test.xml | 29 + flink-connectors/flink-hbase/pom.xml | 264 +++ .../flink/addons/hbase/TableInputFormat.java | 289 +++ .../flink/addons/hbase/TableInputSplit.java | 89 + .../hbase/HBaseTestingClusterAutostarter.java | 238 +++ .../addons/hbase/TableInputFormatITCase.java | 120 ++ .../hbase/example/HBaseFlinkTestConstants.java | 28 + .../addons/hbase/example/HBaseReadExample.java | 92 + .../addons/hbase/example/HBaseWriteExample.java | 202 ++ .../hbase/example/HBaseWriteStreamExample.java | 113 + .../src/test/resources/log4j-test.properties | 23 + flink-connectors/flink-hcatalog/pom.xml | 182 ++ .../flink/hcatalog/HCatInputFormatBase.java | 410 ++++ .../flink/hcatalog/java/HCatInputFormat.java | 160 ++ .../flink/hcatalog/scala/HCatInputFormat.scala | 229 ++ flink-connectors/flink-jdbc/pom.xml | 66 + .../flink/api/java/io/jdbc/JDBCInputFormat.java | 404 ++++ .../api/java/io/jdbc/JDBCOutputFormat.java | 315 +++ .../split/GenericParameterValuesProvider.java | 44 + .../split/NumericBetweenParametersProvider.java | 66 + .../io/jdbc/split/ParameterValuesProvider.java | 35 + .../flink/api/java/io/jdbc/JDBCFullTest.java | 101 + .../api/java/io/jdbc/JDBCInputFormatTest.java | 247 +++ .../api/java/io/jdbc/JDBCOutputFormatTest.java | 169 ++ .../flink/api/java/io/jdbc/JDBCTestBase.java | 183 ++ .../src/test/resources/log4j-test.properties | 19 + .../src/test/resources/logback-test.xml | 29 + flink-connectors/pom.xml | 75 + .../flink-connector-cassandra/pom.xml | 179 -- .../cassandra/CassandraInputFormat.java | 131 -- .../cassandra/CassandraOutputFormat.java | 125 -- .../cassandra/CassandraCommitter.java | 151 -- .../connectors/cassandra/CassandraPojoSink.java | 67 - .../connectors/cassandra/CassandraSink.java | 329 --- .../connectors/cassandra/CassandraSinkBase.java | 98 - .../cassandra/CassandraTupleSink.java | 59 - .../cassandra/CassandraTupleWriteAheadSink.java | 159 -- .../connectors/cassandra/ClusterBuilder.java | 43 - .../cassandra/example/BatchExample.java | 77 - .../cassandra/CassandraConnectorITCase.java | 440 ---- .../CassandraTupleWriteAheadSinkTest.java | 127 -- .../streaming/connectors/cassandra/Pojo.java | 65 - .../example/CassandraPojoSinkExample.java | 62 - .../example/CassandraTupleSinkExample.java | 62 - .../CassandraTupleWriteAheadSinkExample.java | 96 - .../connectors/cassandra/example/Message.java | 56 - .../src/test/resources/cassandra.yaml | 43 - .../src/test/resources/log4j-test.properties | 29 - .../flink-connector-elasticsearch/pom.xml | 90 - .../elasticsearch/ElasticsearchSink.java | 315 --- .../elasticsearch/IndexRequestBuilder.java | 66 - .../elasticsearch/ElasticsearchSinkITCase.java | 205 -- .../examples/ElasticsearchExample.java | 80 - .../src/test/resources/log4j-test.properties | 27 - .../src/test/resources/logback-test.xml | 30 - .../flink-connector-elasticsearch2/pom.xml | 83 - .../elasticsearch2/BulkProcessorIndexer.java | 35 - .../elasticsearch2/ElasticsearchSink.java | 257 --- .../ElasticsearchSinkFunction.java | 60 - .../elasticsearch2/RequestIndexer.java | 25 - .../elasticsearch2/ElasticsearchSinkITCase.java | 233 -- .../examples/ElasticsearchExample.java | 90 - .../src/test/resources/log4j-test.properties | 27 - .../src/test/resources/logback-test.xml | 30 - .../flink-connector-filesystem/pom.xml | 163 -- .../connectors/fs/AvroKeyValueSinkWriter.java | 309 --- .../flink/streaming/connectors/fs/Bucketer.java | 55 - .../flink/streaming/connectors/fs/Clock.java | 33 - .../connectors/fs/DateTimeBucketer.java | 126 -- .../connectors/fs/NonRollingBucketer.java | 47 - .../streaming/connectors/fs/RollingSink.java | 916 -------- .../connectors/fs/SequenceFileWriter.java | 151 -- .../connectors/fs/StreamWriterBase.java | 152 -- .../streaming/connectors/fs/StringWriter.java | 86 - .../streaming/connectors/fs/SystemClock.java | 29 - .../flink/streaming/connectors/fs/Writer.java | 73 - .../fs/bucketing/BasePathBucketer.java | 39 - .../connectors/fs/bucketing/Bucketer.java | 47 - .../connectors/fs/bucketing/BucketingSink.java | 1082 ---------- .../fs/bucketing/DateTimeBucketer.java | 102 - .../src/main/resources/log4j.properties | 27 - .../fs/RollingSinkFaultToleranceITCase.java | 300 --- .../connectors/fs/RollingSinkITCase.java | 991 --------- .../connectors/fs/RollingSinkSecuredITCase.java | 252 --- .../BucketingSinkFaultToleranceITCase.java | 297 --- .../fs/bucketing/BucketingSinkTest.java | 867 -------- .../src/test/resources/log4j-test.properties | 29 - .../src/test/resources/logback-test.xml | 30 - .../flink-connector-flume/pom.xml | 175 -- .../streaming/connectors/flume/FlumeSink.java | 141 -- .../flink-connector-kafka-0.10/pom.xml | 205 -- .../connectors/kafka/FlinkKafkaConsumer010.java | 153 -- .../connectors/kafka/FlinkKafkaProducer010.java | 398 ---- .../kafka/Kafka010JsonTableSource.java | 71 - .../connectors/kafka/Kafka010TableSource.java | 75 - .../kafka/internal/Kafka010Fetcher.java | 104 - .../internal/KafkaConsumerCallBridge010.java | 40 - .../src/main/resources/log4j.properties | 29 - .../connectors/kafka/Kafka010FetcherTest.java | 484 ----- .../connectors/kafka/Kafka010ITCase.java | 313 --- .../kafka/Kafka010ProducerITCase.java | 33 - .../kafka/KafkaTestEnvironmentImpl.java | 420 ---- .../src/test/resources/log4j-test.properties | 30 - .../src/test/resources/logback-test.xml | 30 - .../flink-connector-kafka-0.8/pom.xml | 219 -- .../connectors/kafka/FlinkKafkaConsumer08.java | 398 ---- .../connectors/kafka/FlinkKafkaConsumer081.java | 39 - .../connectors/kafka/FlinkKafkaConsumer082.java | 39 - .../connectors/kafka/FlinkKafkaProducer.java | 64 - .../connectors/kafka/FlinkKafkaProducer08.java | 145 -- .../connectors/kafka/Kafka08JsonTableSink.java | 52 - .../kafka/Kafka08JsonTableSource.java | 71 - .../connectors/kafka/Kafka08TableSource.java | 75 - .../kafka/internals/ClosableBlockingQueue.java | 507 ----- .../kafka/internals/Kafka08Fetcher.java | 481 ----- .../kafka/internals/KillerWatchDog.java | 62 - .../kafka/internals/PartitionInfoFetcher.java | 66 - .../internals/PeriodicOffsetCommitter.java | 85 - .../kafka/internals/SimpleConsumerThread.java | 504 ----- .../kafka/internals/ZookeeperOffsetHandler.java | 164 -- .../connectors/kafka/Kafka08ITCase.java | 248 --- .../kafka/Kafka08JsonTableSinkTest.java | 48 - .../kafka/Kafka08JsonTableSourceTest.java | 45 - .../connectors/kafka/Kafka08ProducerITCase.java | 32 - .../connectors/kafka/KafkaConsumer08Test.java | 139 -- .../connectors/kafka/KafkaLocalSystemTime.java | 48 - .../connectors/kafka/KafkaProducerTest.java | 123 -- .../kafka/KafkaShortRetention08ITCase.java | 34 - .../kafka/KafkaTestEnvironmentImpl.java | 401 ---- .../internals/ClosableBlockingQueueTest.java | 603 ------ .../src/test/resources/log4j-test.properties | 30 - .../src/test/resources/logback-test.xml | 30 - .../flink-connector-kafka-0.9/pom.xml | 212 -- .../connectors/kafka/FlinkKafkaConsumer09.java | 269 --- .../connectors/kafka/FlinkKafkaProducer09.java | 137 -- .../connectors/kafka/Kafka09JsonTableSink.java | 50 - .../kafka/Kafka09JsonTableSource.java | 71 - .../connectors/kafka/Kafka09TableSource.java | 75 - .../connectors/kafka/internal/Handover.java | 214 -- .../kafka/internal/Kafka09Fetcher.java | 241 --- .../kafka/internal/KafkaConsumerCallBridge.java | 41 - .../kafka/internal/KafkaConsumerThread.java | 332 --- .../src/main/resources/log4j.properties | 29 - .../connectors/kafka/Kafka09FetcherTest.java | 482 ----- .../connectors/kafka/Kafka09ITCase.java | 129 -- .../kafka/Kafka09JsonTableSinkTest.java | 48 - .../kafka/Kafka09JsonTableSourceTest.java | 45 - .../connectors/kafka/Kafka09ProducerITCase.java | 32 - .../kafka/Kafka09SecuredRunITCase.java | 62 - .../connectors/kafka/KafkaProducerTest.java | 126 -- .../kafka/KafkaTestEnvironmentImpl.java | 439 ---- .../connectors/kafka/internal/HandoverTest.java | 387 ---- .../src/test/resources/log4j-test.properties | 32 - .../src/test/resources/logback-test.xml | 30 - .../flink-connector-kafka-base/pom.xml | 212 -- .../kafka/FlinkKafkaConsumerBase.java | 552 ----- .../kafka/FlinkKafkaProducerBase.java | 386 ---- .../connectors/kafka/KafkaJsonTableSink.java | 47 - .../connectors/kafka/KafkaJsonTableSource.java | 97 - .../connectors/kafka/KafkaTableSink.java | 127 -- .../connectors/kafka/KafkaTableSource.java | 155 -- .../kafka/internals/AbstractFetcher.java | 552 ----- .../kafka/internals/ExceptionProxy.java | 125 -- .../kafka/internals/KafkaTopicPartition.java | 120 -- .../internals/KafkaTopicPartitionLeader.java | 98 - .../internals/KafkaTopicPartitionState.java | 118 -- ...picPartitionStateWithPeriodicWatermarks.java | 71 - ...cPartitionStateWithPunctuatedWatermarks.java | 84 - .../connectors/kafka/internals/TypeUtil.java | 38 - .../internals/metrics/KafkaMetricWrapper.java | 37 - .../kafka/partitioner/FixedPartitioner.java | 76 - .../kafka/partitioner/KafkaPartitioner.java | 41 - .../JSONDeserializationSchema.java | 46 - .../JSONKeyValueDeserializationSchema.java | 72 - .../JsonRowDeserializationSchema.java | 135 -- .../JsonRowSerializationSchema.java | 70 - .../KeyedDeserializationSchema.java | 52 - .../KeyedDeserializationSchemaWrapper.java | 51 - .../serialization/KeyedSerializationSchema.java | 55 - .../KeyedSerializationSchemaWrapper.java | 48 - ...eInformationKeyValueSerializationSchema.java | 196 -- .../kafka/FlinkKafkaConsumerBaseTest.java | 416 ---- .../kafka/FlinkKafkaProducerBaseTest.java | 288 --- .../kafka/JSONDeserializationSchemaTest.java | 41 - .../JSONKeyValueDeserializationSchemaTest.java | 68 - .../kafka/JsonRowDeserializationSchemaTest.java | 124 -- .../kafka/JsonRowSerializationSchemaTest.java | 98 - .../KafkaConsumerPartitionAssignmentTest.java | 269 --- .../connectors/kafka/KafkaConsumerTestBase.java | 2006 ------------------ .../connectors/kafka/KafkaProducerTestBase.java | 193 -- .../kafka/KafkaShortRetentionTestBase.java | 291 --- .../kafka/KafkaTableSinkTestBase.java | 106 - .../kafka/KafkaTableSourceTestBase.java | 77 - .../connectors/kafka/KafkaTestBase.java | 203 -- .../connectors/kafka/KafkaTestEnvironment.java | 112 - .../connectors/kafka/TestFixedPartitioner.java | 104 - .../AbstractFetcherTimestampsTest.java | 320 --- .../internals/KafkaTopicPartitionTest.java | 57 - .../kafka/testutils/DataGenerators.java | 227 -- .../kafka/testutils/FailingIdentityMapper.java | 115 - .../testutils/FakeStandardProducerConfig.java | 34 - .../testutils/JobManagerCommunicationUtils.java | 120 -- .../testutils/PartitionValidatingMapper.java | 53 - .../kafka/testutils/ThrottledMapper.java | 44 - .../kafka/testutils/Tuple2Partitioner.java | 48 - .../testutils/ValidatingExactlyOnceSink.java | 82 - .../testutils/ZooKeeperStringSerializer.java | 51 - .../src/test/resources/log4j-test.properties | 29 - .../src/test/resources/logback-test.xml | 30 - .../flink-connector-kinesis/pom.xml | 164 -- .../kinesis/FlinkKinesisConsumer.java | 304 --- .../kinesis/FlinkKinesisProducer.java | 292 --- .../connectors/kinesis/KinesisPartitioner.java | 49 - .../kinesis/config/AWSConfigConstants.java | 70 - .../kinesis/config/ConsumerConfigConstants.java | 138 -- .../kinesis/config/ProducerConfigConstants.java | 33 - .../kinesis/examples/ConsumeFromKinesis.java | 54 - .../kinesis/examples/ProduceIntoKinesis.java | 77 - .../kinesis/internals/KinesisDataFetcher.java | 679 ------ .../kinesis/internals/ShardConsumer.java | 287 --- .../kinesis/model/KinesisStreamShard.java | 133 -- .../kinesis/model/KinesisStreamShardState.java | 71 - .../kinesis/model/SentinelSequenceNumber.java | 51 - .../kinesis/model/SequenceNumber.java | 104 - .../kinesis/proxy/GetShardListResult.java | 75 - .../connectors/kinesis/proxy/KinesisProxy.java | 338 --- .../kinesis/proxy/KinesisProxyInterface.java | 69 - .../KinesisDeserializationSchema.java | 57 - .../KinesisDeserializationSchemaWrapper.java | 57 - .../KinesisSerializationSchema.java | 45 - .../connectors/kinesis/util/AWSUtil.java | 130 -- .../kinesis/util/KinesisConfigUtil.java | 218 -- .../src/main/resources/log4j.properties | 27 - .../kinesis/FlinkKinesisConsumerTest.java | 472 ----- .../internals/KinesisDataFetcherTest.java | 510 ----- .../kinesis/internals/ShardConsumerTest.java | 122 -- .../manualtests/ManualConsumerProducerTest.java | 121 -- .../manualtests/ManualExactlyOnceTest.java | 147 -- ...nualExactlyOnceWithStreamReshardingTest.java | 247 --- .../kinesis/manualtests/ManualProducerTest.java | 91 - .../ExactlyOnceValidatingConsumerThread.java | 155 -- .../testutils/FakeKinesisBehavioursFactory.java | 262 --- .../KinesisEventsGeneratorProducerThread.java | 118 -- .../testutils/KinesisShardIdGenerator.java | 25 - .../testutils/TestableFlinkKinesisConsumer.java | 60 - .../testutils/TestableKinesisDataFetcher.java | 122 -- .../flink-connector-nifi/pom.xml | 89 - .../connectors/nifi/NiFiDataPacket.java | 39 - .../connectors/nifi/NiFiDataPacketBuilder.java | 34 - .../streaming/connectors/nifi/NiFiSink.java | 74 - .../streaming/connectors/nifi/NiFiSource.java | 155 -- .../connectors/nifi/StandardNiFiDataPacket.java | 46 - .../nifi/examples/NiFiSinkTopologyExample.java | 55 - .../examples/NiFiSourceTopologyExample.java | 58 - .../src/test/resources/NiFi_Flink.xml | 16 - .../flink-connector-rabbitmq/pom.xml | 60 - .../streaming/connectors/rabbitmq/RMQSink.java | 142 -- .../connectors/rabbitmq/RMQSource.java | 243 --- .../rabbitmq/common/RMQConnectionConfig.java | 448 ---- .../connectors/rabbitmq/RMQSourceTest.java | 419 ---- .../common/RMQConnectionConfigTest.java | 69 - .../connectors/rabbitmq/common/RMQSinkTest.java | 125 -- .../flink-connector-redis/pom.xml | 79 - .../streaming/connectors/redis/RedisSink.java | 188 -- .../common/config/FlinkJedisClusterConfig.java | 187 -- .../common/config/FlinkJedisConfigBase.java | 90 - .../common/config/FlinkJedisPoolConfig.java | 224 -- .../common/config/FlinkJedisSentinelConfig.java | 259 --- .../common/container/RedisClusterContainer.java | 171 -- .../container/RedisCommandsContainer.java | 115 - .../RedisCommandsContainerBuilder.java | 116 - .../redis/common/container/RedisContainer.java | 252 --- .../redis/common/mapper/RedisCommand.java | 86 - .../common/mapper/RedisCommandDescription.java | 94 - .../redis/common/mapper/RedisDataType.java | 66 - .../redis/common/mapper/RedisMapper.java | 66 - .../connectors/redis/RedisITCaseBase.java | 45 - .../redis/RedisSentinelClusterTest.java | 100 - .../connectors/redis/RedisSinkITCase.java | 233 -- .../redis/RedisSinkPublishITCase.java | 137 -- .../connectors/redis/RedisSinkTest.java | 144 -- .../common/config/FlinkJedisConfigBaseTest.java | 50 - .../common/config/JedisClusterConfigTest.java | 49 - .../common/config/JedisPoolConfigTest.java | 29 - .../common/config/JedisSentinelConfigTest.java | 49 - .../mapper/RedisDataTypeDescriptionTest.java | 41 - .../flink-connector-twitter/pom.xml | 96 - .../connectors/twitter/TwitterSource.java | 217 -- flink-streaming-connectors/pom.xml | 70 - pom.xml | 9 +- 687 files changed, 53344 insertions(+), 53385 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index e18629d..6cbccc1 100644 --- a/.gitignore +++ b/.gitignore @@ -17,7 +17,7 @@ tmp *.log .DS_Store build-target -flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/ +flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/ flink-runtime-web/web-dashboard/assets/fonts/ flink-runtime-web/web-dashboard/node_modules/ flink-runtime-web/web-dashboard/bower_components/ http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/pom.xml ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/pom.xml b/flink-batch-connectors/flink-avro/pom.xml deleted file mode 100644 index 1161173..0000000 --- a/flink-batch-connectors/flink-avro/pom.xml +++ /dev/null @@ -1,216 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-batch-connectors</artifactId> - <version>1.2-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-avro_2.10</artifactId> - <name>flink-avro</name> - - <packaging>jar</packaging> - - <dependencies> - - <!-- core dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - <!-- version is derived from base module --> - </dependency> - - <!-- test dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils-junit</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-clients_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <executions> - <execution> - <id>create-test-dependency</id> - <phase>process-test-classes</phase> - <goals> - <goal>single</goal> - </goals> - <configuration> - <archive> - <manifest> - <mainClass>org.apache.flink.api.avro.testjar.AvroExternalJarProgram</mainClass> - </manifest> - </archive> - <finalName>maven</finalName> - <attach>false</attach> - <descriptors> - <descriptor>src/test/assembly/test-assembly.xml</descriptor> - </descriptors> - </configuration> - </execution> - </executions> - </plugin> - <!--Remove the AvroExternalJarProgram code from the test-classes directory since it musn't be in the - classpath when running the tests to actually test whether the user code class loader - is properly used.--> - <plugin> - <artifactId>maven-clean-plugin</artifactId> - <version>2.5</version><!--$NO-MVN-MAN-VER$--> - <executions> - <execution> - <id>remove-avroexternalprogram</id> - <phase>process-test-classes</phase> - <goals> - <goal>clean</goal> - </goals> - <configuration> - <excludeDefaultDirectories>true</excludeDefaultDirectories> - <filesets> - <fileset> - <directory>${project.build.testOutputDirectory}</directory> - <includes> - <include>**/testjar/*.class</include> - </includes> - </fileset> - </filesets> - </configuration> - </execution> - </executions> - </plugin> - <!-- Generate Test class from avro schema --> - <plugin> - <groupId>org.apache.avro</groupId> - <artifactId>avro-maven-plugin</artifactId> - <version>1.7.7</version> - <executions> - <execution> - <phase>generate-sources</phase> - <goals> - <goal>schema</goal> - </goals> - <configuration> - <testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory> - <testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - - <pluginManagement> - <plugins> - <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.--> - <plugin> - <groupId>org.eclipse.m2e</groupId> - <artifactId>lifecycle-mapping</artifactId> - <version>1.0.0</version> - <configuration> - <lifecycleMappingMetadata> - <pluginExecutions> - <pluginExecution> - <pluginExecutionFilter> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-assembly-plugin</artifactId> - <versionRange>[2.4,)</versionRange> - <goals> - <goal>single</goal> - </goals> - </pluginExecutionFilter> - <action> - <ignore/> - </action> - </pluginExecution> - <pluginExecution> - <pluginExecutionFilter> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-clean-plugin</artifactId> - <versionRange>[1,)</versionRange> - <goals> - <goal>clean</goal> - </goals> - </pluginExecutionFilter> - <action> - <ignore/> - </action> - </pluginExecution> - <pluginExecution> - <pluginExecutionFilter> - <groupId>org.apache.avro</groupId> - <artifactId>avro-maven-plugin</artifactId> - <versionRange>[1.7.7,)</versionRange> - <goals> - <goal>schema</goal> - </goals> - </pluginExecutionFilter> - <action> - <ignore/> - </action> - </pluginExecution> - </pluginExecutions> - </lifecycleMappingMetadata> - </configuration> - </plugin> - </plugins> - </pluginManagement> - </build> - -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java deleted file mode 100644 index 59da4cb..0000000 --- a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.avro; - -import java.io.DataInput; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.avro.io.Decoder; -import org.apache.avro.util.Utf8; - - -public class DataInputDecoder extends Decoder { - - private final Utf8 stringDecoder = new Utf8(); - - private DataInput in; - - public void setIn(DataInput in) { - this.in = in; - } - - // -------------------------------------------------------------------------------------------- - // primitives - // -------------------------------------------------------------------------------------------- - - @Override - public void readNull() {} - - - @Override - public boolean readBoolean() throws IOException { - return in.readBoolean(); - } - - @Override - public int readInt() throws IOException { - return in.readInt(); - } - - @Override - public long readLong() throws IOException { - return in.readLong(); - } - - @Override - public float readFloat() throws IOException { - return in.readFloat(); - } - - @Override - public double readDouble() throws IOException { - return in.readDouble(); - } - - @Override - public int readEnum() throws IOException { - return readInt(); - } - - // -------------------------------------------------------------------------------------------- - // bytes - // -------------------------------------------------------------------------------------------- - - @Override - public void readFixed(byte[] bytes, int start, int length) throws IOException { - in.readFully(bytes, start, length); - } - - @Override - public ByteBuffer readBytes(ByteBuffer old) throws IOException { - int length = readInt(); - ByteBuffer result; - if (old != null && length <= old.capacity() && old.hasArray()) { - result = old; - result.clear(); - } else { - result = ByteBuffer.allocate(length); - } - in.readFully(result.array(), result.arrayOffset() + result.position(), length); - result.limit(length); - return result; - } - - - @Override - public void skipFixed(int length) throws IOException { - skipBytes(length); - } - - @Override - public void skipBytes() throws IOException { - int num = readInt(); - skipBytes(num); - } - - // -------------------------------------------------------------------------------------------- - // strings - // -------------------------------------------------------------------------------------------- - - - @Override - public Utf8 readString(Utf8 old) throws IOException { - int length = readInt(); - Utf8 result = (old != null ? old : new Utf8()); - result.setByteLength(length); - - if (length > 0) { - in.readFully(result.getBytes(), 0, length); - } - - return result; - } - - @Override - public String readString() throws IOException { - return readString(stringDecoder).toString(); - } - - @Override - public void skipString() throws IOException { - int len = readInt(); - skipBytes(len); - } - - // -------------------------------------------------------------------------------------------- - // collection types - // -------------------------------------------------------------------------------------------- - - @Override - public long readArrayStart() throws IOException { - return readVarLongCount(in); - } - - @Override - public long arrayNext() throws IOException { - return readVarLongCount(in); - } - - @Override - public long skipArray() throws IOException { - return readVarLongCount(in); - } - - @Override - public long readMapStart() throws IOException { - return readVarLongCount(in); - } - - @Override - public long mapNext() throws IOException { - return readVarLongCount(in); - } - - @Override - public long skipMap() throws IOException { - return readVarLongCount(in); - } - - // -------------------------------------------------------------------------------------------- - // union - // -------------------------------------------------------------------------------------------- - - @Override - public int readIndex() throws IOException { - return readInt(); - } - - // -------------------------------------------------------------------------------------------- - // utils - // -------------------------------------------------------------------------------------------- - - private void skipBytes(int num) throws IOException { - while (num > 0) { - num -= in.skipBytes(num); - } - } - - public static long readVarLongCount(DataInput in) throws IOException { - long value = in.readUnsignedByte(); - - if ((value & 0x80) == 0) { - return value; - } - else { - long curr; - int shift = 7; - value = value & 0x7f; - while (((curr = in.readUnsignedByte()) & 0x80) != 0){ - value |= (curr & 0x7f) << shift; - shift += 7; - } - value |= curr << shift; - return value; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java deleted file mode 100644 index 0102cc1..0000000 --- a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.avro; - -import java.io.DataOutput; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.avro.io.Encoder; -import org.apache.avro.util.Utf8; - - -public final class DataOutputEncoder extends Encoder implements java.io.Serializable { - - private static final long serialVersionUID = 1L; - - private DataOutput out; - - - public void setOut(DataOutput out) { - this.out = out; - } - - - @Override - public void flush() throws IOException {} - - // -------------------------------------------------------------------------------------------- - // primitives - // -------------------------------------------------------------------------------------------- - - @Override - public void writeNull() {} - - - @Override - public void writeBoolean(boolean b) throws IOException { - out.writeBoolean(b); - } - - @Override - public void writeInt(int n) throws IOException { - out.writeInt(n); - } - - @Override - public void writeLong(long n) throws IOException { - out.writeLong(n); - } - - @Override - public void writeFloat(float f) throws IOException { - out.writeFloat(f); - } - - @Override - public void writeDouble(double d) throws IOException { - out.writeDouble(d); - } - - @Override - public void writeEnum(int e) throws IOException { - out.writeInt(e); - } - - - // -------------------------------------------------------------------------------------------- - // bytes - // -------------------------------------------------------------------------------------------- - - @Override - public void writeFixed(byte[] bytes, int start, int len) throws IOException { - out.write(bytes, start, len); - } - - @Override - public void writeBytes(byte[] bytes, int start, int len) throws IOException { - out.writeInt(len); - if (len > 0) { - out.write(bytes, start, len); - } - } - - @Override - public void writeBytes(ByteBuffer bytes) throws IOException { - int num = bytes.remaining(); - out.writeInt(num); - - if (num > 0) { - writeFixed(bytes); - } - } - - // -------------------------------------------------------------------------------------------- - // strings - // -------------------------------------------------------------------------------------------- - - @Override - public void writeString(String str) throws IOException { - byte[] bytes = Utf8.getBytesFor(str); - writeBytes(bytes, 0, bytes.length); - } - - @Override - public void writeString(Utf8 utf8) throws IOException { - writeBytes(utf8.getBytes(), 0, utf8.getByteLength()); - - } - - // -------------------------------------------------------------------------------------------- - // collection types - // -------------------------------------------------------------------------------------------- - - @Override - public void writeArrayStart() {} - - @Override - public void setItemCount(long itemCount) throws IOException { - if (itemCount > 0) { - writeVarLongCount(out, itemCount); - } - } - - @Override - public void startItem() {} - - @Override - public void writeArrayEnd() throws IOException { - // write a single byte 0, shortcut for a var-length long of 0 - out.write(0); - } - - @Override - public void writeMapStart() {} - - @Override - public void writeMapEnd() throws IOException { - // write a single byte 0, shortcut for a var-length long of 0 - out.write(0); - } - - // -------------------------------------------------------------------------------------------- - // union - // -------------------------------------------------------------------------------------------- - - @Override - public void writeIndex(int unionIndex) throws IOException { - out.writeInt(unionIndex); - } - - // -------------------------------------------------------------------------------------------- - // utils - // -------------------------------------------------------------------------------------------- - - - public static void writeVarLongCount(DataOutput out, long val) throws IOException { - if (val < 0) { - throw new IOException("Illegal count (must be non-negative): " + val); - } - - while ((val & ~0x7FL) != 0) { - out.write(((int) val) | 0x80); - val >>>= 7; - } - out.write((int) val); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java deleted file mode 100644 index 709c4f1..0000000 --- a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.avro; - -import java.io.Closeable; -import java.io.IOException; - -import org.apache.avro.file.SeekableInput; -import org.apache.flink.core.fs.FSDataInputStream; - - -/** - * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache licensed as well) - * - * The wrapper keeps track of the position in the data stream. - */ -public class FSDataInputStreamWrapper implements Closeable, SeekableInput { - private final FSDataInputStream stream; - private long pos; - private long len; - - public FSDataInputStreamWrapper(FSDataInputStream stream, long len) { - this.stream = stream; - this.pos = 0; - this.len = len; - } - - public long length() throws IOException { - return this.len; - } - - public int read(byte[] b, int off, int len) throws IOException { - int read; - read = stream.read(b, off, len); - pos += read; - return read; - } - - public void seek(long p) throws IOException { - stream.seek(p); - pos = p; - } - - public long tell() throws IOException { - return pos; - } - - public void close() throws IOException { - stream.close(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java deleted file mode 100644 index 73067c1..0000000 --- a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.io; - -import java.io.IOException; - -import org.apache.avro.file.DataFileReader; -import org.apache.avro.file.SeekableInput; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumReader; -import org.apache.avro.reflect.ReflectDatumReader; -import org.apache.avro.specific.SpecificDatumReader; -import org.apache.flink.api.common.io.CheckpointableInputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.api.avro.FSDataInputStreamWrapper; -import org.apache.flink.api.common.io.FileInputFormat; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; -import org.apache.flink.util.InstantiationUtil; - -/** - * Provides a {@link FileInputFormat} for Avro records. - * - * @param <E> - * the type of the result Avro record. If you specify - * {@link GenericRecord} then the result will be returned as a - * {@link GenericRecord}, so you do not have to know the schema ahead - * of time. - */ -public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E>, - CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> { - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class); - - private final Class<E> avroValueType; - - private boolean reuseAvroValue = true; - - private transient DataFileReader<E> dataFileReader; - - private transient long end; - - private transient long recordsReadSinceLastSync; - - private long lastSync = -1l; - - public AvroInputFormat(Path filePath, Class<E> type) { - super(filePath); - this.avroValueType = type; - } - - /** - * Sets the flag whether to reuse the Avro value instance for all records. - * By default, the input format reuses the Avro value. - * - * @param reuseAvroValue True, if the input format should reuse the Avro value instance, false otherwise. - */ - public void setReuseAvroValue(boolean reuseAvroValue) { - this.reuseAvroValue = reuseAvroValue; - } - - /** - * If set, the InputFormat will only read entire files. - */ - public void setUnsplittable(boolean unsplittable) { - this.unsplittable = unsplittable; - } - - // -------------------------------------------------------------------------------------------- - // Typing - // -------------------------------------------------------------------------------------------- - - @Override - public TypeInformation<E> getProducedType() { - return TypeExtractor.getForClass(this.avroValueType); - } - - // -------------------------------------------------------------------------------------------- - // Input Format Methods - // -------------------------------------------------------------------------------------------- - - @Override - public void open(FileInputSplit split) throws IOException { - super.open(split); - dataFileReader = initReader(split); - dataFileReader.sync(split.getStart()); - lastSync = dataFileReader.previousSync(); - } - - private DataFileReader<E> initReader(FileInputSplit split) throws IOException { - DatumReader<E> datumReader; - - if (org.apache.avro.generic.GenericRecord.class == avroValueType) { - datumReader = new GenericDatumReader<E>(); - } else { - datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType) - ? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType); - } - if (LOG.isInfoEnabled()) { - LOG.info("Opening split {}", split); - } - - SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen()); - DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader); - - if (LOG.isDebugEnabled()) { - LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema()); - } - - end = split.getStart() + split.getLength(); - recordsReadSinceLastSync = 0; - return dataFileReader; - } - - @Override - public boolean reachedEnd() throws IOException { - return !dataFileReader.hasNext() || dataFileReader.pastSync(end); - } - - public long getRecordsReadFromBlock() { - return this.recordsReadSinceLastSync; - } - - @Override - public E nextRecord(E reuseValue) throws IOException { - if (reachedEnd()) { - return null; - } - - // if we start a new block, then register the event, and - // restart the counter. - if(dataFileReader.previousSync() != lastSync) { - lastSync = dataFileReader.previousSync(); - recordsReadSinceLastSync = 0; - } - recordsReadSinceLastSync++; - - if (reuseAvroValue) { - return dataFileReader.next(reuseValue); - } else { - if (GenericRecord.class == avroValueType) { - return dataFileReader.next(); - } else { - return dataFileReader.next(InstantiationUtil.instantiate(avroValueType, Object.class)); - } - } - } - - // -------------------------------------------------------------------------------------------- - // Checkpointing - // -------------------------------------------------------------------------------------------- - - @Override - public Tuple2<Long, Long> getCurrentState() throws IOException { - return new Tuple2<>(this.lastSync, this.recordsReadSinceLastSync); - } - - @Override - public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException { - Preconditions.checkNotNull(split, "reopen() cannot be called on a null split."); - Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state."); - - try { - this.open(split); - } finally { - if (state.f0 != -1) { - lastSync = state.f0; - recordsReadSinceLastSync = state.f1; - } - } - - if (lastSync != -1) { - // open and read until the record we were before - // the checkpoint and discard the values - dataFileReader.seek(lastSync); - for(int i = 0; i < recordsReadSinceLastSync; i++) { - dataFileReader.next(null); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java deleted file mode 100644 index 600d1e5..0000000 --- a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.io; - -import org.apache.avro.Schema; -import org.apache.avro.file.CodecFactory; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.reflect.ReflectData; -import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.flink.api.common.io.FileOutputFormat; -import org.apache.flink.core.fs.Path; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -import java.io.IOException; -import java.io.Serializable; - -public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable { - - /** - * Wrapper which encapsulates the supported codec and a related serialization byte. - */ - public enum Codec { - - NULL((byte)0, CodecFactory.nullCodec()), - SNAPPY((byte)1, CodecFactory.snappyCodec()), - BZIP2((byte)2, CodecFactory.bzip2Codec()), - DEFLATE((byte)3, CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)), - XZ((byte)4, CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL)); - - private byte codecByte; - - private CodecFactory codecFactory; - - Codec(final byte codecByte, final CodecFactory codecFactory) { - this.codecByte = codecByte; - this.codecFactory = codecFactory; - } - - private byte getCodecByte() { - return codecByte; - } - - private CodecFactory getCodecFactory() { - return codecFactory; - } - - private static Codec forCodecByte(byte codecByte) { - for (final Codec codec : Codec.values()) { - if (codec.getCodecByte() == codecByte) { - return codec; - } - } - throw new IllegalArgumentException("no codec for codecByte: " + codecByte); - } - } - - private static final long serialVersionUID = 1L; - - private final Class<E> avroValueType; - - private transient Schema userDefinedSchema = null; - - private transient Codec codec = null; - - private transient DataFileWriter<E> dataFileWriter; - - public AvroOutputFormat(Path filePath, Class<E> type) { - super(filePath); - this.avroValueType = type; - } - - public AvroOutputFormat(Class<E> type) { - this.avroValueType = type; - } - - @Override - protected String getDirectoryFileName(int taskNumber) { - return super.getDirectoryFileName(taskNumber) + ".avro"; - } - - public void setSchema(Schema schema) { - this.userDefinedSchema = schema; - } - - /** - * Set avro codec for compression. - * - * @param codec avro codec. - */ - public void setCodec(final Codec codec) { - this.codec = checkNotNull(codec, "codec can not be null"); - } - - @Override - public void writeRecord(E record) throws IOException { - dataFileWriter.append(record); - } - - @Override - public void open(int taskNumber, int numTasks) throws IOException { - super.open(taskNumber, numTasks); - - DatumWriter<E> datumWriter; - Schema schema; - if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) { - datumWriter = new SpecificDatumWriter<E>(avroValueType); - try { - schema = ((org.apache.avro.specific.SpecificRecordBase)avroValueType.newInstance()).getSchema(); - } catch (InstantiationException e) { - throw new RuntimeException(e.getMessage()); - } catch (IllegalAccessException e) { - throw new RuntimeException(e.getMessage()); - } - } else { - datumWriter = new ReflectDatumWriter<E>(avroValueType); - schema = ReflectData.get().getSchema(avroValueType); - } - dataFileWriter = new DataFileWriter<E>(datumWriter); - if (codec != null) { - dataFileWriter.setCodec(codec.getCodecFactory()); - } - if (userDefinedSchema == null) { - dataFileWriter.create(schema, stream); - } else { - dataFileWriter.create(userDefinedSchema, stream); - } - } - - private void writeObject(java.io.ObjectOutputStream out) throws IOException { - out.defaultWriteObject(); - - if (codec != null) { - out.writeByte(codec.getCodecByte()); - } else { - out.writeByte(-1); - } - - if(userDefinedSchema != null) { - byte[] json = userDefinedSchema.toString().getBytes(); - out.writeInt(json.length); - out.write(json); - } else { - out.writeInt(0); - } - } - - private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - - byte codecByte = in.readByte(); - if (codecByte >= 0) { - setCodec(Codec.forCodecByte(codecByte)); - } - - int length = in.readInt(); - if(length != 0) { - byte[] json = new byte[length]; - in.readFully(json); - - Schema schema = new Schema.Parser().parse(new String(json)); - setSchema(schema); - } - } - - @Override - public void close() throws IOException { - dataFileWriter.flush(); - dataFileWriter.close(); - super.close(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/assembly/test-assembly.xml ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/assembly/test-assembly.xml b/flink-batch-connectors/flink-avro/src/test/assembly/test-assembly.xml deleted file mode 100644 index 0f4561a..0000000 --- a/flink-batch-connectors/flink-avro/src/test/assembly/test-assembly.xml +++ /dev/null @@ -1,36 +0,0 @@ -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -<assembly> - <id>test-jar</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <fileSets> - <fileSet> - <directory>${project.build.testOutputDirectory}</directory> - <outputDirectory>/</outputDirectory> - <!--modify/add include to match your package(s) --> - <includes> - <include>org/apache/flink/api/avro/testjar/**</include> - </includes> - </fileSet> - </fileSets> -</assembly> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java deleted file mode 100644 index 1030ff8..0000000 --- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.avro; - -import java.io.File; - -import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.client.program.PackagedProgram; -import org.apache.flink.client.program.StandaloneClusterClient; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; - -import org.junit.Assert; -import org.junit.Test; - -public class AvroExternalJarProgramITCase { - - private static final String JAR_FILE = "maven-test-jar.jar"; - - private static final String TEST_DATA_FILE = "/testdata.avro"; - - @Test - public void testExternalProgram() { - - LocalFlinkMiniCluster testMiniCluster = null; - - try { - Configuration config = new Configuration(); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); - testMiniCluster = new LocalFlinkMiniCluster(config, false); - testMiniCluster.start(); - - String jarFile = JAR_FILE; - String testData = getClass().getResource(TEST_DATA_FILE).toString(); - - PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData }); - - - config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); - config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, testMiniCluster.getLeaderRPCPort()); - - ClusterClient client = new StandaloneClusterClient(config); - - client.setPrintStatusDuringExecution(false); - client.run(program, 4); - - } - catch (Throwable t) { - System.err.println(t.getMessage()); - t.printStackTrace(); - Assert.fail("Error during the packaged program execution: " + t.getMessage()); - } - finally { - if (testMiniCluster != null) { - try { - testMiniCluster.stop(); - } catch (Throwable t) { - // ignore - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java deleted file mode 100644 index 3b01ccb..0000000 --- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.avro; - -import org.junit.Assert; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; - -import org.apache.avro.file.DataFileReader; -import org.apache.avro.io.DatumReader; -import org.apache.avro.reflect.ReflectDatumReader; -import org.apache.avro.specific.SpecificDatumReader; -import org.apache.flink.api.io.avro.example.User; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.io.AvroOutputFormat; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.test.util.JavaProgramTestBase; - -@SuppressWarnings("serial") -public class AvroOutputFormatITCase extends JavaProgramTestBase { - - public static String outputPath1; - - public static String outputPath2; - - public static String inputPath; - - public static String userData = "alice|1|blue\n" + - "bob|2|red\n" + - "john|3|yellow\n" + - "walt|4|black\n"; - - @Override - protected void preSubmit() throws Exception { - inputPath = createTempFile("user", userData); - outputPath1 = getTempDirPath("avro_output1"); - outputPath2 = getTempDirPath("avro_output2"); - } - - - @Override - protected void testProgram() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<String, Integer, String>> input = env.readCsvFile(inputPath) - .fieldDelimiter("|") - .types(String.class, Integer.class, String.class); - - //output the data with AvroOutputFormat for specific user type - DataSet<User> specificUser = input.map(new ConvertToUser()); - AvroOutputFormat<User> avroOutputFormat = new AvroOutputFormat<User>(User.class); - avroOutputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY); // FLINK-4771: use a codec - avroOutputFormat.setSchema(User.SCHEMA$); //FLINK-3304: Ensure the OF is properly serializing the schema - specificUser.write(avroOutputFormat, outputPath1); - - //output the data with AvroOutputFormat for reflect user type - DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective()); - reflectiveUser.write(new AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2); - - env.execute(); - } - - @Override - protected void postSubmit() throws Exception { - //compare result for specific user type - File [] output1; - File file1 = asFile(outputPath1); - if (file1.isDirectory()) { - output1 = file1.listFiles(); - // check for avro ext in dir. - for (File avroOutput : output1) { - Assert.assertTrue("Expect extension '.avro'", avroOutput.toString().endsWith(".avro")); - } - } else { - output1 = new File[] {file1}; - } - List<String> result1 = new ArrayList<String>(); - DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class); - for (File avroOutput : output1) { - - DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1); - while (dataFileReader1.hasNext()) { - User user = dataFileReader1.next(); - result1.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor()); - } - } - for (String expectedResult : userData.split("\n")) { - Assert.assertTrue("expected user " + expectedResult + " not found.", result1.contains(expectedResult)); - } - - //compare result for reflect user type - File [] output2; - File file2 = asFile(outputPath2); - if (file2.isDirectory()) { - output2 = file2.listFiles(); - } else { - output2 = new File[] {file2}; - } - List<String> result2 = new ArrayList<String>(); - DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class); - for (File avroOutput : output2) { - DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2); - while (dataFileReader2.hasNext()) { - ReflectiveUser user = dataFileReader2.next(); - result2.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor()); - } - } - for (String expectedResult : userData.split("\n")) { - Assert.assertTrue("expected user " + expectedResult + " not found.", result2.contains(expectedResult)); - } - - - } - - - public final static class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> { - - @Override - public User map(Tuple3<String, Integer, String> value) throws Exception { - return new User(value.f0, value.f1, value.f2); - } - } - - public final static class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> { - - @Override - public ReflectiveUser map(User value) throws Exception { - return new ReflectiveUser(value.getName().toString(), value.getFavoriteNumber(), value.getFavoriteColor().toString()); - } - } - - - public static class ReflectiveUser { - private String name; - private int favoriteNumber; - private String favoriteColor; - - public ReflectiveUser() {} - - public ReflectiveUser(String name, int favoriteNumber, String favoriteColor) { - this.name = name; - this.favoriteNumber = favoriteNumber; - this.favoriteColor = favoriteColor; - } - - public String getName() { - return this.name; - } - public String getFavoriteColor() { - return this.favoriteColor; - } - public int getFavoriteNumber() { - return this.favoriteNumber; - } - } -}
