[FLINK-4676] [connectors] Merge batch and streaming connectors into common 
Maven module.

This closes #2897.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de4fe3b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de4fe3b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de4fe3b7

Branch: refs/heads/master
Commit: de4fe3b7392948807753d65d13f3da968e6c7de0
Parents: cc006ff
Author: Fabian Hueske <[email protected]>
Authored: Tue Nov 29 13:57:30 2016 +0100
Committer: Fabian Hueske <[email protected]>
Committed: Fri Dec 2 14:28:35 2016 +0100

----------------------------------------------------------------------
 .gitignore                                      |    2 +-
 flink-batch-connectors/flink-avro/pom.xml       |  216 --
 .../apache/flink/api/avro/DataInputDecoder.java |  213 --
 .../flink/api/avro/DataOutputEncoder.java       |  183 --
 .../api/avro/FSDataInputStreamWrapper.java      |   68 -
 .../flink/api/java/io/AvroInputFormat.java      |  207 --
 .../flink/api/java/io/AvroOutputFormat.java     |  189 --
 .../src/test/assembly/test-assembly.xml         |   36 -
 .../api/avro/AvroExternalJarProgramITCase.java  |   80 -
 .../flink/api/avro/AvroOutputFormatITCase.java  |  176 --
 .../flink/api/avro/EncoderDecoderTest.java      |  528 -----
 .../avro/testjar/AvroExternalJarProgram.java    |  219 --
 .../apache/flink/api/io/avro/AvroPojoTest.java  |  255 ---
 .../api/io/avro/AvroRecordInputFormatTest.java  |  458 ----
 .../io/avro/AvroSplittableInputFormatTest.java  |  326 ---
 .../api/io/avro/example/AvroTypeExample.java    |  108 -
 .../apache/flink/api/io/avro/example/User.java  |  269 ---
 .../io/AvroInputFormatTypeExtractionTest.java   |   81 -
 .../flink/api/java/io/AvroOutputFormatTest.java |  154 --
 .../src/test/resources/avro/user.avsc           |   35 -
 .../src/test/resources/log4j-test.properties    |   27 -
 .../src/test/resources/logback-test.xml         |   29 -
 .../flink-avro/src/test/resources/testdata.avro |  Bin 4572 -> 0 bytes
 .../flink-hadoop-compatibility/pom.xml          |  182 --
 .../api/java/typeutils/WritableTypeInfo.java    |  154 --
 .../typeutils/runtime/WritableComparator.java   |  188 --
 .../typeutils/runtime/WritableSerializer.java   |  152 --
 .../flink/hadoopcompatibility/HadoopInputs.java |  118 --
 .../flink/hadoopcompatibility/HadoopUtils.java  |   52 -
 .../mapred/HadoopMapFunction.java               |  133 --
 .../mapred/HadoopReduceCombineFunction.java     |  168 --
 .../mapred/HadoopReduceFunction.java            |  142 --
 .../mapred/wrapper/HadoopOutputCollector.java   |   59 -
 .../wrapper/HadoopTupleUnwrappingIterator.java  |   94 -
 .../scala/HadoopInputs.scala                    |  143 --
 .../java/typeutils/WritableExtractionTest.java  |  206 --
 .../java/typeutils/WritableInfoParserTest.java  |   84 -
 .../java/typeutils/WritableTypeInfoTest.java    |   72 -
 .../typeutils/runtime/StringArrayWritable.java  |   83 -
 .../runtime/WritableComparatorTest.java         |   53 -
 .../runtime/WritableComparatorUUIDTest.java     |   46 -
 .../api/java/typeutils/runtime/WritableID.java  |   78 -
 .../runtime/WritableSerializerTest.java         |   50 -
 .../runtime/WritableSerializerUUIDTest.java     |   50 -
 .../hadoopcompatibility/HadoopUtilsTest.java    |   34 -
 .../mapred/HadoopMapFunctionITCase.java         |  182 --
 .../mapred/HadoopMapredITCase.java              |   47 -
 .../HadoopReduceCombineFunctionITCase.java      |  265 ---
 .../mapred/HadoopReduceFunctionITCase.java      |  213 --
 .../mapred/HadoopTestData.java                  |   62 -
 .../example/HadoopMapredCompatWordCount.java    |  133 --
 .../HadoopTupleUnwrappingIteratorTest.java      |  139 --
 .../mapreduce/HadoopInputOutputITCase.java      |   47 -
 .../mapreduce/example/WordCount.java            |  119 --
 .../src/test/resources/log4j-test.properties    |   27 -
 .../src/test/resources/logback-test.xml         |   29 -
 flink-batch-connectors/flink-hbase/pom.xml      |  264 ---
 .../flink/addons/hbase/TableInputFormat.java    |  289 ---
 .../flink/addons/hbase/TableInputSplit.java     |   89 -
 .../hbase/HBaseTestingClusterAutostarter.java   |  238 ---
 .../addons/hbase/TableInputFormatITCase.java    |  120 --
 .../hbase/example/HBaseFlinkTestConstants.java  |   28 -
 .../addons/hbase/example/HBaseReadExample.java  |   92 -
 .../addons/hbase/example/HBaseWriteExample.java |  202 --
 .../hbase/example/HBaseWriteStreamExample.java  |  113 -
 .../src/test/resources/log4j-test.properties    |   23 -
 flink-batch-connectors/flink-hcatalog/pom.xml   |  182 --
 .../flink/hcatalog/HCatInputFormatBase.java     |  410 ----
 .../flink/hcatalog/java/HCatInputFormat.java    |  160 --
 .../flink/hcatalog/scala/HCatInputFormat.scala  |  229 --
 flink-batch-connectors/flink-jdbc/pom.xml       |   66 -
 .../flink/api/java/io/jdbc/JDBCInputFormat.java |  404 ----
 .../api/java/io/jdbc/JDBCOutputFormat.java      |  315 ---
 .../split/GenericParameterValuesProvider.java   |   44 -
 .../split/NumericBetweenParametersProvider.java |   66 -
 .../io/jdbc/split/ParameterValuesProvider.java  |   35 -
 .../flink/api/java/io/jdbc/JDBCFullTest.java    |  101 -
 .../api/java/io/jdbc/JDBCInputFormatTest.java   |  247 ---
 .../api/java/io/jdbc/JDBCOutputFormatTest.java  |  169 --
 .../flink/api/java/io/jdbc/JDBCTestBase.java    |  183 --
 .../src/test/resources/log4j-test.properties    |   19 -
 .../src/test/resources/logback-test.xml         |   29 -
 flink-batch-connectors/pom.xml                  |   45 -
 flink-connectors/flink-avro/pom.xml             |  216 ++
 .../apache/flink/api/avro/DataInputDecoder.java |  213 ++
 .../flink/api/avro/DataOutputEncoder.java       |  183 ++
 .../api/avro/FSDataInputStreamWrapper.java      |   68 +
 .../flink/api/java/io/AvroInputFormat.java      |  207 ++
 .../flink/api/java/io/AvroOutputFormat.java     |  189 ++
 .../src/test/assembly/test-assembly.xml         |   36 +
 .../api/avro/AvroExternalJarProgramITCase.java  |   80 +
 .../flink/api/avro/AvroOutputFormatITCase.java  |  176 ++
 .../flink/api/avro/EncoderDecoderTest.java      |  528 +++++
 .../avro/testjar/AvroExternalJarProgram.java    |  219 ++
 .../apache/flink/api/io/avro/AvroPojoTest.java  |  255 +++
 .../api/io/avro/AvroRecordInputFormatTest.java  |  458 ++++
 .../io/avro/AvroSplittableInputFormatTest.java  |  326 +++
 .../api/io/avro/example/AvroTypeExample.java    |  108 +
 .../apache/flink/api/io/avro/example/User.java  |  269 +++
 .../io/AvroInputFormatTypeExtractionTest.java   |   81 +
 .../flink/api/java/io/AvroOutputFormatTest.java |  154 ++
 .../src/test/resources/avro/user.avsc           |   35 +
 .../src/test/resources/log4j-test.properties    |   27 +
 .../src/test/resources/logback-test.xml         |   29 +
 .../flink-avro/src/test/resources/testdata.avro |  Bin 0 -> 4572 bytes
 .../flink-connector-cassandra/pom.xml           |  179 ++
 .../cassandra/CassandraInputFormat.java         |  131 ++
 .../cassandra/CassandraOutputFormat.java        |  125 ++
 .../cassandra/CassandraCommitter.java           |  151 ++
 .../connectors/cassandra/CassandraPojoSink.java |   67 +
 .../connectors/cassandra/CassandraSink.java     |  329 +++
 .../connectors/cassandra/CassandraSinkBase.java |   98 +
 .../cassandra/CassandraTupleSink.java           |   59 +
 .../cassandra/CassandraTupleWriteAheadSink.java |  159 ++
 .../connectors/cassandra/ClusterBuilder.java    |   43 +
 .../cassandra/example/BatchExample.java         |   77 +
 .../cassandra/CassandraConnectorITCase.java     |  440 ++++
 .../CassandraTupleWriteAheadSinkTest.java       |  127 ++
 .../streaming/connectors/cassandra/Pojo.java    |   65 +
 .../example/CassandraPojoSinkExample.java       |   62 +
 .../example/CassandraTupleSinkExample.java      |   62 +
 .../CassandraTupleWriteAheadSinkExample.java    |   96 +
 .../connectors/cassandra/example/Message.java   |   56 +
 .../src/test/resources/cassandra.yaml           |   43 +
 .../src/test/resources/log4j-test.properties    |   29 +
 .../flink-connector-elasticsearch/pom.xml       |   90 +
 .../elasticsearch/ElasticsearchSink.java        |  315 +++
 .../elasticsearch/IndexRequestBuilder.java      |   66 +
 .../elasticsearch/ElasticsearchSinkITCase.java  |  205 ++
 .../examples/ElasticsearchExample.java          |   80 +
 .../src/test/resources/log4j-test.properties    |   27 +
 .../src/test/resources/logback-test.xml         |   30 +
 .../flink-connector-elasticsearch2/pom.xml      |   83 +
 .../elasticsearch2/BulkProcessorIndexer.java    |   35 +
 .../elasticsearch2/ElasticsearchSink.java       |  257 +++
 .../ElasticsearchSinkFunction.java              |   60 +
 .../elasticsearch2/RequestIndexer.java          |   25 +
 .../elasticsearch2/ElasticsearchSinkITCase.java |  233 ++
 .../examples/ElasticsearchExample.java          |   90 +
 .../src/test/resources/log4j-test.properties    |   27 +
 .../src/test/resources/logback-test.xml         |   30 +
 .../flink-connector-filesystem/pom.xml          |  163 ++
 .../connectors/fs/AvroKeyValueSinkWriter.java   |  309 +++
 .../flink/streaming/connectors/fs/Bucketer.java |   55 +
 .../flink/streaming/connectors/fs/Clock.java    |   33 +
 .../connectors/fs/DateTimeBucketer.java         |  126 ++
 .../connectors/fs/NonRollingBucketer.java       |   47 +
 .../streaming/connectors/fs/RollingSink.java    |  916 ++++++++
 .../connectors/fs/SequenceFileWriter.java       |  151 ++
 .../connectors/fs/StreamWriterBase.java         |  152 ++
 .../streaming/connectors/fs/StringWriter.java   |   86 +
 .../streaming/connectors/fs/SystemClock.java    |   29 +
 .../flink/streaming/connectors/fs/Writer.java   |   73 +
 .../fs/bucketing/BasePathBucketer.java          |   39 +
 .../connectors/fs/bucketing/Bucketer.java       |   47 +
 .../connectors/fs/bucketing/BucketingSink.java  | 1082 ++++++++++
 .../fs/bucketing/DateTimeBucketer.java          |  102 +
 .../src/main/resources/log4j.properties         |   27 +
 .../fs/RollingSinkFaultToleranceITCase.java     |  300 +++
 .../connectors/fs/RollingSinkITCase.java        |  991 +++++++++
 .../connectors/fs/RollingSinkSecuredITCase.java |  252 +++
 .../BucketingSinkFaultToleranceITCase.java      |  297 +++
 .../fs/bucketing/BucketingSinkTest.java         |  867 ++++++++
 .../src/test/resources/log4j-test.properties    |   29 +
 .../src/test/resources/logback-test.xml         |   30 +
 flink-connectors/flink-connector-flume/pom.xml  |  175 ++
 .../streaming/connectors/flume/FlumeSink.java   |  141 ++
 .../flink-connector-kafka-0.10/pom.xml          |  205 ++
 .../connectors/kafka/FlinkKafkaConsumer010.java |  153 ++
 .../connectors/kafka/FlinkKafkaProducer010.java |  398 ++++
 .../kafka/Kafka010JsonTableSource.java          |   71 +
 .../connectors/kafka/Kafka010TableSource.java   |   75 +
 .../kafka/internal/Kafka010Fetcher.java         |  104 +
 .../internal/KafkaConsumerCallBridge010.java    |   40 +
 .../src/main/resources/log4j.properties         |   29 +
 .../connectors/kafka/Kafka010FetcherTest.java   |  484 +++++
 .../connectors/kafka/Kafka010ITCase.java        |  313 +++
 .../kafka/Kafka010ProducerITCase.java           |   33 +
 .../kafka/KafkaTestEnvironmentImpl.java         |  420 ++++
 .../src/test/resources/log4j-test.properties    |   30 +
 .../src/test/resources/logback-test.xml         |   30 +
 .../flink-connector-kafka-0.8/pom.xml           |  219 ++
 .../connectors/kafka/FlinkKafkaConsumer08.java  |  398 ++++
 .../connectors/kafka/FlinkKafkaConsumer081.java |   39 +
 .../connectors/kafka/FlinkKafkaConsumer082.java |   39 +
 .../connectors/kafka/FlinkKafkaProducer.java    |   64 +
 .../connectors/kafka/FlinkKafkaProducer08.java  |  145 ++
 .../connectors/kafka/Kafka08JsonTableSink.java  |   52 +
 .../kafka/Kafka08JsonTableSource.java           |   71 +
 .../connectors/kafka/Kafka08TableSource.java    |   75 +
 .../kafka/internals/ClosableBlockingQueue.java  |  507 +++++
 .../kafka/internals/Kafka08Fetcher.java         |  481 +++++
 .../kafka/internals/KillerWatchDog.java         |   62 +
 .../kafka/internals/PartitionInfoFetcher.java   |   66 +
 .../internals/PeriodicOffsetCommitter.java      |   85 +
 .../kafka/internals/SimpleConsumerThread.java   |  504 +++++
 .../kafka/internals/ZookeeperOffsetHandler.java |  164 ++
 .../connectors/kafka/Kafka08ITCase.java         |  248 +++
 .../kafka/Kafka08JsonTableSinkTest.java         |   48 +
 .../kafka/Kafka08JsonTableSourceTest.java       |   45 +
 .../connectors/kafka/Kafka08ProducerITCase.java |   32 +
 .../connectors/kafka/KafkaConsumer08Test.java   |  139 ++
 .../connectors/kafka/KafkaLocalSystemTime.java  |   48 +
 .../connectors/kafka/KafkaProducerTest.java     |  123 ++
 .../kafka/KafkaShortRetention08ITCase.java      |   34 +
 .../kafka/KafkaTestEnvironmentImpl.java         |  401 ++++
 .../internals/ClosableBlockingQueueTest.java    |  603 ++++++
 .../src/test/resources/log4j-test.properties    |   30 +
 .../src/test/resources/logback-test.xml         |   30 +
 .../flink-connector-kafka-0.9/pom.xml           |  212 ++
 .../connectors/kafka/FlinkKafkaConsumer09.java  |  269 +++
 .../connectors/kafka/FlinkKafkaProducer09.java  |  137 ++
 .../connectors/kafka/Kafka09JsonTableSink.java  |   50 +
 .../kafka/Kafka09JsonTableSource.java           |   71 +
 .../connectors/kafka/Kafka09TableSource.java    |   75 +
 .../connectors/kafka/internal/Handover.java     |  214 ++
 .../kafka/internal/Kafka09Fetcher.java          |  241 +++
 .../kafka/internal/KafkaConsumerCallBridge.java |   41 +
 .../kafka/internal/KafkaConsumerThread.java     |  332 +++
 .../src/main/resources/log4j.properties         |   29 +
 .../connectors/kafka/Kafka09FetcherTest.java    |  482 +++++
 .../connectors/kafka/Kafka09ITCase.java         |  129 ++
 .../kafka/Kafka09JsonTableSinkTest.java         |   48 +
 .../kafka/Kafka09JsonTableSourceTest.java       |   45 +
 .../connectors/kafka/Kafka09ProducerITCase.java |   32 +
 .../kafka/Kafka09SecuredRunITCase.java          |   62 +
 .../connectors/kafka/KafkaProducerTest.java     |  126 ++
 .../kafka/KafkaTestEnvironmentImpl.java         |  439 ++++
 .../connectors/kafka/internal/HandoverTest.java |  387 ++++
 .../src/test/resources/log4j-test.properties    |   32 +
 .../src/test/resources/logback-test.xml         |   30 +
 .../flink-connector-kafka-base/pom.xml          |  212 ++
 .../kafka/FlinkKafkaConsumerBase.java           |  552 +++++
 .../kafka/FlinkKafkaProducerBase.java           |  386 ++++
 .../connectors/kafka/KafkaJsonTableSink.java    |   47 +
 .../connectors/kafka/KafkaJsonTableSource.java  |   97 +
 .../connectors/kafka/KafkaTableSink.java        |  127 ++
 .../connectors/kafka/KafkaTableSource.java      |  155 ++
 .../kafka/internals/AbstractFetcher.java        |  552 +++++
 .../kafka/internals/ExceptionProxy.java         |  125 ++
 .../kafka/internals/KafkaTopicPartition.java    |  120 ++
 .../internals/KafkaTopicPartitionLeader.java    |   98 +
 .../internals/KafkaTopicPartitionState.java     |  118 ++
 ...picPartitionStateWithPeriodicWatermarks.java |   71 +
 ...cPartitionStateWithPunctuatedWatermarks.java |   84 +
 .../connectors/kafka/internals/TypeUtil.java    |   38 +
 .../internals/metrics/KafkaMetricWrapper.java   |   37 +
 .../kafka/partitioner/FixedPartitioner.java     |   76 +
 .../kafka/partitioner/KafkaPartitioner.java     |   41 +
 .../JSONDeserializationSchema.java              |   46 +
 .../JSONKeyValueDeserializationSchema.java      |   72 +
 .../JsonRowDeserializationSchema.java           |  135 ++
 .../JsonRowSerializationSchema.java             |   70 +
 .../KeyedDeserializationSchema.java             |   52 +
 .../KeyedDeserializationSchemaWrapper.java      |   51 +
 .../serialization/KeyedSerializationSchema.java |   55 +
 .../KeyedSerializationSchemaWrapper.java        |   48 +
 ...eInformationKeyValueSerializationSchema.java |  196 ++
 .../kafka/FlinkKafkaConsumerBaseTest.java       |  416 ++++
 .../kafka/FlinkKafkaProducerBaseTest.java       |  288 +++
 .../kafka/JSONDeserializationSchemaTest.java    |   41 +
 .../JSONKeyValueDeserializationSchemaTest.java  |   68 +
 .../kafka/JsonRowDeserializationSchemaTest.java |  124 ++
 .../kafka/JsonRowSerializationSchemaTest.java   |   98 +
 .../KafkaConsumerPartitionAssignmentTest.java   |  269 +++
 .../connectors/kafka/KafkaConsumerTestBase.java | 2006 ++++++++++++++++++
 .../connectors/kafka/KafkaProducerTestBase.java |  193 ++
 .../kafka/KafkaShortRetentionTestBase.java      |  291 +++
 .../kafka/KafkaTableSinkTestBase.java           |  106 +
 .../kafka/KafkaTableSourceTestBase.java         |   77 +
 .../connectors/kafka/KafkaTestBase.java         |  203 ++
 .../connectors/kafka/KafkaTestEnvironment.java  |  112 +
 .../connectors/kafka/TestFixedPartitioner.java  |  104 +
 .../AbstractFetcherTimestampsTest.java          |  320 +++
 .../internals/KafkaTopicPartitionTest.java      |   57 +
 .../kafka/testutils/DataGenerators.java         |  227 ++
 .../kafka/testutils/FailingIdentityMapper.java  |  115 +
 .../testutils/FakeStandardProducerConfig.java   |   34 +
 .../testutils/JobManagerCommunicationUtils.java |  120 ++
 .../testutils/PartitionValidatingMapper.java    |   53 +
 .../kafka/testutils/ThrottledMapper.java        |   44 +
 .../kafka/testutils/Tuple2Partitioner.java      |   48 +
 .../testutils/ValidatingExactlyOnceSink.java    |   82 +
 .../testutils/ZooKeeperStringSerializer.java    |   51 +
 .../src/test/resources/log4j-test.properties    |   29 +
 .../src/test/resources/logback-test.xml         |   30 +
 .../flink-connector-kinesis/pom.xml             |  164 ++
 .../kinesis/FlinkKinesisConsumer.java           |  304 +++
 .../kinesis/FlinkKinesisProducer.java           |  292 +++
 .../connectors/kinesis/KinesisPartitioner.java  |   49 +
 .../kinesis/config/AWSConfigConstants.java      |   70 +
 .../kinesis/config/ConsumerConfigConstants.java |  138 ++
 .../kinesis/config/ProducerConfigConstants.java |   33 +
 .../kinesis/examples/ConsumeFromKinesis.java    |   54 +
 .../kinesis/examples/ProduceIntoKinesis.java    |   77 +
 .../kinesis/internals/KinesisDataFetcher.java   |  679 ++++++
 .../kinesis/internals/ShardConsumer.java        |  287 +++
 .../kinesis/model/KinesisStreamShard.java       |  133 ++
 .../kinesis/model/KinesisStreamShardState.java  |   71 +
 .../kinesis/model/SentinelSequenceNumber.java   |   51 +
 .../kinesis/model/SequenceNumber.java           |  104 +
 .../kinesis/proxy/GetShardListResult.java       |   75 +
 .../connectors/kinesis/proxy/KinesisProxy.java  |  338 +++
 .../kinesis/proxy/KinesisProxyInterface.java    |   69 +
 .../KinesisDeserializationSchema.java           |   57 +
 .../KinesisDeserializationSchemaWrapper.java    |   57 +
 .../KinesisSerializationSchema.java             |   45 +
 .../connectors/kinesis/util/AWSUtil.java        |  130 ++
 .../kinesis/util/KinesisConfigUtil.java         |  218 ++
 .../src/main/resources/log4j.properties         |   27 +
 .../kinesis/FlinkKinesisConsumerTest.java       |  472 +++++
 .../internals/KinesisDataFetcherTest.java       |  510 +++++
 .../kinesis/internals/ShardConsumerTest.java    |  122 ++
 .../manualtests/ManualConsumerProducerTest.java |  121 ++
 .../manualtests/ManualExactlyOnceTest.java      |  147 ++
 ...nualExactlyOnceWithStreamReshardingTest.java |  247 +++
 .../kinesis/manualtests/ManualProducerTest.java |   91 +
 .../ExactlyOnceValidatingConsumerThread.java    |  155 ++
 .../testutils/FakeKinesisBehavioursFactory.java |  262 +++
 .../KinesisEventsGeneratorProducerThread.java   |  118 ++
 .../testutils/KinesisShardIdGenerator.java      |   25 +
 .../testutils/TestableFlinkKinesisConsumer.java |   60 +
 .../testutils/TestableKinesisDataFetcher.java   |  122 ++
 flink-connectors/flink-connector-nifi/pom.xml   |   89 +
 .../connectors/nifi/NiFiDataPacket.java         |   39 +
 .../connectors/nifi/NiFiDataPacketBuilder.java  |   34 +
 .../streaming/connectors/nifi/NiFiSink.java     |   74 +
 .../streaming/connectors/nifi/NiFiSource.java   |  155 ++
 .../connectors/nifi/StandardNiFiDataPacket.java |   46 +
 .../nifi/examples/NiFiSinkTopologyExample.java  |   55 +
 .../examples/NiFiSourceTopologyExample.java     |   58 +
 .../src/test/resources/NiFi_Flink.xml           |   16 +
 .../flink-connector-rabbitmq/pom.xml            |   60 +
 .../streaming/connectors/rabbitmq/RMQSink.java  |  142 ++
 .../connectors/rabbitmq/RMQSource.java          |  243 +++
 .../rabbitmq/common/RMQConnectionConfig.java    |  448 ++++
 .../connectors/rabbitmq/RMQSourceTest.java      |  419 ++++
 .../common/RMQConnectionConfigTest.java         |   69 +
 .../connectors/rabbitmq/common/RMQSinkTest.java |  125 ++
 flink-connectors/flink-connector-redis/pom.xml  |   79 +
 .../streaming/connectors/redis/RedisSink.java   |  188 ++
 .../common/config/FlinkJedisClusterConfig.java  |  187 ++
 .../common/config/FlinkJedisConfigBase.java     |   90 +
 .../common/config/FlinkJedisPoolConfig.java     |  224 ++
 .../common/config/FlinkJedisSentinelConfig.java |  259 +++
 .../common/container/RedisClusterContainer.java |  171 ++
 .../container/RedisCommandsContainer.java       |  115 +
 .../RedisCommandsContainerBuilder.java          |  116 +
 .../redis/common/container/RedisContainer.java  |  252 +++
 .../redis/common/mapper/RedisCommand.java       |   86 +
 .../common/mapper/RedisCommandDescription.java  |   94 +
 .../redis/common/mapper/RedisDataType.java      |   66 +
 .../redis/common/mapper/RedisMapper.java        |   66 +
 .../connectors/redis/RedisITCaseBase.java       |   45 +
 .../redis/RedisSentinelClusterTest.java         |  100 +
 .../connectors/redis/RedisSinkITCase.java       |  233 ++
 .../redis/RedisSinkPublishITCase.java           |  137 ++
 .../connectors/redis/RedisSinkTest.java         |  144 ++
 .../common/config/FlinkJedisConfigBaseTest.java |   50 +
 .../common/config/JedisClusterConfigTest.java   |   49 +
 .../common/config/JedisPoolConfigTest.java      |   29 +
 .../common/config/JedisSentinelConfigTest.java  |   49 +
 .../mapper/RedisDataTypeDescriptionTest.java    |   41 +
 .../flink-connector-twitter/pom.xml             |   96 +
 .../connectors/twitter/TwitterSource.java       |  217 ++
 .../flink-hadoop-compatibility/pom.xml          |  182 ++
 .../api/java/typeutils/WritableTypeInfo.java    |  154 ++
 .../typeutils/runtime/WritableComparator.java   |  188 ++
 .../typeutils/runtime/WritableSerializer.java   |  152 ++
 .../flink/hadoopcompatibility/HadoopInputs.java |  118 ++
 .../flink/hadoopcompatibility/HadoopUtils.java  |   52 +
 .../mapred/HadoopMapFunction.java               |  133 ++
 .../mapred/HadoopReduceCombineFunction.java     |  168 ++
 .../mapred/HadoopReduceFunction.java            |  142 ++
 .../mapred/wrapper/HadoopOutputCollector.java   |   59 +
 .../wrapper/HadoopTupleUnwrappingIterator.java  |   94 +
 .../scala/HadoopInputs.scala                    |  143 ++
 .../java/typeutils/WritableExtractionTest.java  |  206 ++
 .../java/typeutils/WritableInfoParserTest.java  |   84 +
 .../java/typeutils/WritableTypeInfoTest.java    |   72 +
 .../typeutils/runtime/StringArrayWritable.java  |   83 +
 .../runtime/WritableComparatorTest.java         |   53 +
 .../runtime/WritableComparatorUUIDTest.java     |   46 +
 .../api/java/typeutils/runtime/WritableID.java  |   78 +
 .../runtime/WritableSerializerTest.java         |   50 +
 .../runtime/WritableSerializerUUIDTest.java     |   50 +
 .../hadoopcompatibility/HadoopUtilsTest.java    |   34 +
 .../mapred/HadoopMapFunctionITCase.java         |  182 ++
 .../mapred/HadoopMapredITCase.java              |   47 +
 .../HadoopReduceCombineFunctionITCase.java      |  265 +++
 .../mapred/HadoopReduceFunctionITCase.java      |  213 ++
 .../mapred/HadoopTestData.java                  |   62 +
 .../example/HadoopMapredCompatWordCount.java    |  133 ++
 .../HadoopTupleUnwrappingIteratorTest.java      |  139 ++
 .../mapreduce/HadoopInputOutputITCase.java      |   47 +
 .../mapreduce/example/WordCount.java            |  119 ++
 .../src/test/resources/log4j-test.properties    |   27 +
 .../src/test/resources/logback-test.xml         |   29 +
 flink-connectors/flink-hbase/pom.xml            |  264 +++
 .../flink/addons/hbase/TableInputFormat.java    |  289 +++
 .../flink/addons/hbase/TableInputSplit.java     |   89 +
 .../hbase/HBaseTestingClusterAutostarter.java   |  238 +++
 .../addons/hbase/TableInputFormatITCase.java    |  120 ++
 .../hbase/example/HBaseFlinkTestConstants.java  |   28 +
 .../addons/hbase/example/HBaseReadExample.java  |   92 +
 .../addons/hbase/example/HBaseWriteExample.java |  202 ++
 .../hbase/example/HBaseWriteStreamExample.java  |  113 +
 .../src/test/resources/log4j-test.properties    |   23 +
 flink-connectors/flink-hcatalog/pom.xml         |  182 ++
 .../flink/hcatalog/HCatInputFormatBase.java     |  410 ++++
 .../flink/hcatalog/java/HCatInputFormat.java    |  160 ++
 .../flink/hcatalog/scala/HCatInputFormat.scala  |  229 ++
 flink-connectors/flink-jdbc/pom.xml             |   66 +
 .../flink/api/java/io/jdbc/JDBCInputFormat.java |  404 ++++
 .../api/java/io/jdbc/JDBCOutputFormat.java      |  315 +++
 .../split/GenericParameterValuesProvider.java   |   44 +
 .../split/NumericBetweenParametersProvider.java |   66 +
 .../io/jdbc/split/ParameterValuesProvider.java  |   35 +
 .../flink/api/java/io/jdbc/JDBCFullTest.java    |  101 +
 .../api/java/io/jdbc/JDBCInputFormatTest.java   |  247 +++
 .../api/java/io/jdbc/JDBCOutputFormatTest.java  |  169 ++
 .../flink/api/java/io/jdbc/JDBCTestBase.java    |  183 ++
 .../src/test/resources/log4j-test.properties    |   19 +
 .../src/test/resources/logback-test.xml         |   29 +
 flink-connectors/pom.xml                        |   75 +
 .../flink-connector-cassandra/pom.xml           |  179 --
 .../cassandra/CassandraInputFormat.java         |  131 --
 .../cassandra/CassandraOutputFormat.java        |  125 --
 .../cassandra/CassandraCommitter.java           |  151 --
 .../connectors/cassandra/CassandraPojoSink.java |   67 -
 .../connectors/cassandra/CassandraSink.java     |  329 ---
 .../connectors/cassandra/CassandraSinkBase.java |   98 -
 .../cassandra/CassandraTupleSink.java           |   59 -
 .../cassandra/CassandraTupleWriteAheadSink.java |  159 --
 .../connectors/cassandra/ClusterBuilder.java    |   43 -
 .../cassandra/example/BatchExample.java         |   77 -
 .../cassandra/CassandraConnectorITCase.java     |  440 ----
 .../CassandraTupleWriteAheadSinkTest.java       |  127 --
 .../streaming/connectors/cassandra/Pojo.java    |   65 -
 .../example/CassandraPojoSinkExample.java       |   62 -
 .../example/CassandraTupleSinkExample.java      |   62 -
 .../CassandraTupleWriteAheadSinkExample.java    |   96 -
 .../connectors/cassandra/example/Message.java   |   56 -
 .../src/test/resources/cassandra.yaml           |   43 -
 .../src/test/resources/log4j-test.properties    |   29 -
 .../flink-connector-elasticsearch/pom.xml       |   90 -
 .../elasticsearch/ElasticsearchSink.java        |  315 ---
 .../elasticsearch/IndexRequestBuilder.java      |   66 -
 .../elasticsearch/ElasticsearchSinkITCase.java  |  205 --
 .../examples/ElasticsearchExample.java          |   80 -
 .../src/test/resources/log4j-test.properties    |   27 -
 .../src/test/resources/logback-test.xml         |   30 -
 .../flink-connector-elasticsearch2/pom.xml      |   83 -
 .../elasticsearch2/BulkProcessorIndexer.java    |   35 -
 .../elasticsearch2/ElasticsearchSink.java       |  257 ---
 .../ElasticsearchSinkFunction.java              |   60 -
 .../elasticsearch2/RequestIndexer.java          |   25 -
 .../elasticsearch2/ElasticsearchSinkITCase.java |  233 --
 .../examples/ElasticsearchExample.java          |   90 -
 .../src/test/resources/log4j-test.properties    |   27 -
 .../src/test/resources/logback-test.xml         |   30 -
 .../flink-connector-filesystem/pom.xml          |  163 --
 .../connectors/fs/AvroKeyValueSinkWriter.java   |  309 ---
 .../flink/streaming/connectors/fs/Bucketer.java |   55 -
 .../flink/streaming/connectors/fs/Clock.java    |   33 -
 .../connectors/fs/DateTimeBucketer.java         |  126 --
 .../connectors/fs/NonRollingBucketer.java       |   47 -
 .../streaming/connectors/fs/RollingSink.java    |  916 --------
 .../connectors/fs/SequenceFileWriter.java       |  151 --
 .../connectors/fs/StreamWriterBase.java         |  152 --
 .../streaming/connectors/fs/StringWriter.java   |   86 -
 .../streaming/connectors/fs/SystemClock.java    |   29 -
 .../flink/streaming/connectors/fs/Writer.java   |   73 -
 .../fs/bucketing/BasePathBucketer.java          |   39 -
 .../connectors/fs/bucketing/Bucketer.java       |   47 -
 .../connectors/fs/bucketing/BucketingSink.java  | 1082 ----------
 .../fs/bucketing/DateTimeBucketer.java          |  102 -
 .../src/main/resources/log4j.properties         |   27 -
 .../fs/RollingSinkFaultToleranceITCase.java     |  300 ---
 .../connectors/fs/RollingSinkITCase.java        |  991 ---------
 .../connectors/fs/RollingSinkSecuredITCase.java |  252 ---
 .../BucketingSinkFaultToleranceITCase.java      |  297 ---
 .../fs/bucketing/BucketingSinkTest.java         |  867 --------
 .../src/test/resources/log4j-test.properties    |   29 -
 .../src/test/resources/logback-test.xml         |   30 -
 .../flink-connector-flume/pom.xml               |  175 --
 .../streaming/connectors/flume/FlumeSink.java   |  141 --
 .../flink-connector-kafka-0.10/pom.xml          |  205 --
 .../connectors/kafka/FlinkKafkaConsumer010.java |  153 --
 .../connectors/kafka/FlinkKafkaProducer010.java |  398 ----
 .../kafka/Kafka010JsonTableSource.java          |   71 -
 .../connectors/kafka/Kafka010TableSource.java   |   75 -
 .../kafka/internal/Kafka010Fetcher.java         |  104 -
 .../internal/KafkaConsumerCallBridge010.java    |   40 -
 .../src/main/resources/log4j.properties         |   29 -
 .../connectors/kafka/Kafka010FetcherTest.java   |  484 -----
 .../connectors/kafka/Kafka010ITCase.java        |  313 ---
 .../kafka/Kafka010ProducerITCase.java           |   33 -
 .../kafka/KafkaTestEnvironmentImpl.java         |  420 ----
 .../src/test/resources/log4j-test.properties    |   30 -
 .../src/test/resources/logback-test.xml         |   30 -
 .../flink-connector-kafka-0.8/pom.xml           |  219 --
 .../connectors/kafka/FlinkKafkaConsumer08.java  |  398 ----
 .../connectors/kafka/FlinkKafkaConsumer081.java |   39 -
 .../connectors/kafka/FlinkKafkaConsumer082.java |   39 -
 .../connectors/kafka/FlinkKafkaProducer.java    |   64 -
 .../connectors/kafka/FlinkKafkaProducer08.java  |  145 --
 .../connectors/kafka/Kafka08JsonTableSink.java  |   52 -
 .../kafka/Kafka08JsonTableSource.java           |   71 -
 .../connectors/kafka/Kafka08TableSource.java    |   75 -
 .../kafka/internals/ClosableBlockingQueue.java  |  507 -----
 .../kafka/internals/Kafka08Fetcher.java         |  481 -----
 .../kafka/internals/KillerWatchDog.java         |   62 -
 .../kafka/internals/PartitionInfoFetcher.java   |   66 -
 .../internals/PeriodicOffsetCommitter.java      |   85 -
 .../kafka/internals/SimpleConsumerThread.java   |  504 -----
 .../kafka/internals/ZookeeperOffsetHandler.java |  164 --
 .../connectors/kafka/Kafka08ITCase.java         |  248 ---
 .../kafka/Kafka08JsonTableSinkTest.java         |   48 -
 .../kafka/Kafka08JsonTableSourceTest.java       |   45 -
 .../connectors/kafka/Kafka08ProducerITCase.java |   32 -
 .../connectors/kafka/KafkaConsumer08Test.java   |  139 --
 .../connectors/kafka/KafkaLocalSystemTime.java  |   48 -
 .../connectors/kafka/KafkaProducerTest.java     |  123 --
 .../kafka/KafkaShortRetention08ITCase.java      |   34 -
 .../kafka/KafkaTestEnvironmentImpl.java         |  401 ----
 .../internals/ClosableBlockingQueueTest.java    |  603 ------
 .../src/test/resources/log4j-test.properties    |   30 -
 .../src/test/resources/logback-test.xml         |   30 -
 .../flink-connector-kafka-0.9/pom.xml           |  212 --
 .../connectors/kafka/FlinkKafkaConsumer09.java  |  269 ---
 .../connectors/kafka/FlinkKafkaProducer09.java  |  137 --
 .../connectors/kafka/Kafka09JsonTableSink.java  |   50 -
 .../kafka/Kafka09JsonTableSource.java           |   71 -
 .../connectors/kafka/Kafka09TableSource.java    |   75 -
 .../connectors/kafka/internal/Handover.java     |  214 --
 .../kafka/internal/Kafka09Fetcher.java          |  241 ---
 .../kafka/internal/KafkaConsumerCallBridge.java |   41 -
 .../kafka/internal/KafkaConsumerThread.java     |  332 ---
 .../src/main/resources/log4j.properties         |   29 -
 .../connectors/kafka/Kafka09FetcherTest.java    |  482 -----
 .../connectors/kafka/Kafka09ITCase.java         |  129 --
 .../kafka/Kafka09JsonTableSinkTest.java         |   48 -
 .../kafka/Kafka09JsonTableSourceTest.java       |   45 -
 .../connectors/kafka/Kafka09ProducerITCase.java |   32 -
 .../kafka/Kafka09SecuredRunITCase.java          |   62 -
 .../connectors/kafka/KafkaProducerTest.java     |  126 --
 .../kafka/KafkaTestEnvironmentImpl.java         |  439 ----
 .../connectors/kafka/internal/HandoverTest.java |  387 ----
 .../src/test/resources/log4j-test.properties    |   32 -
 .../src/test/resources/logback-test.xml         |   30 -
 .../flink-connector-kafka-base/pom.xml          |  212 --
 .../kafka/FlinkKafkaConsumerBase.java           |  552 -----
 .../kafka/FlinkKafkaProducerBase.java           |  386 ----
 .../connectors/kafka/KafkaJsonTableSink.java    |   47 -
 .../connectors/kafka/KafkaJsonTableSource.java  |   97 -
 .../connectors/kafka/KafkaTableSink.java        |  127 --
 .../connectors/kafka/KafkaTableSource.java      |  155 --
 .../kafka/internals/AbstractFetcher.java        |  552 -----
 .../kafka/internals/ExceptionProxy.java         |  125 --
 .../kafka/internals/KafkaTopicPartition.java    |  120 --
 .../internals/KafkaTopicPartitionLeader.java    |   98 -
 .../internals/KafkaTopicPartitionState.java     |  118 --
 ...picPartitionStateWithPeriodicWatermarks.java |   71 -
 ...cPartitionStateWithPunctuatedWatermarks.java |   84 -
 .../connectors/kafka/internals/TypeUtil.java    |   38 -
 .../internals/metrics/KafkaMetricWrapper.java   |   37 -
 .../kafka/partitioner/FixedPartitioner.java     |   76 -
 .../kafka/partitioner/KafkaPartitioner.java     |   41 -
 .../JSONDeserializationSchema.java              |   46 -
 .../JSONKeyValueDeserializationSchema.java      |   72 -
 .../JsonRowDeserializationSchema.java           |  135 --
 .../JsonRowSerializationSchema.java             |   70 -
 .../KeyedDeserializationSchema.java             |   52 -
 .../KeyedDeserializationSchemaWrapper.java      |   51 -
 .../serialization/KeyedSerializationSchema.java |   55 -
 .../KeyedSerializationSchemaWrapper.java        |   48 -
 ...eInformationKeyValueSerializationSchema.java |  196 --
 .../kafka/FlinkKafkaConsumerBaseTest.java       |  416 ----
 .../kafka/FlinkKafkaProducerBaseTest.java       |  288 ---
 .../kafka/JSONDeserializationSchemaTest.java    |   41 -
 .../JSONKeyValueDeserializationSchemaTest.java  |   68 -
 .../kafka/JsonRowDeserializationSchemaTest.java |  124 --
 .../kafka/JsonRowSerializationSchemaTest.java   |   98 -
 .../KafkaConsumerPartitionAssignmentTest.java   |  269 ---
 .../connectors/kafka/KafkaConsumerTestBase.java | 2006 ------------------
 .../connectors/kafka/KafkaProducerTestBase.java |  193 --
 .../kafka/KafkaShortRetentionTestBase.java      |  291 ---
 .../kafka/KafkaTableSinkTestBase.java           |  106 -
 .../kafka/KafkaTableSourceTestBase.java         |   77 -
 .../connectors/kafka/KafkaTestBase.java         |  203 --
 .../connectors/kafka/KafkaTestEnvironment.java  |  112 -
 .../connectors/kafka/TestFixedPartitioner.java  |  104 -
 .../AbstractFetcherTimestampsTest.java          |  320 ---
 .../internals/KafkaTopicPartitionTest.java      |   57 -
 .../kafka/testutils/DataGenerators.java         |  227 --
 .../kafka/testutils/FailingIdentityMapper.java  |  115 -
 .../testutils/FakeStandardProducerConfig.java   |   34 -
 .../testutils/JobManagerCommunicationUtils.java |  120 --
 .../testutils/PartitionValidatingMapper.java    |   53 -
 .../kafka/testutils/ThrottledMapper.java        |   44 -
 .../kafka/testutils/Tuple2Partitioner.java      |   48 -
 .../testutils/ValidatingExactlyOnceSink.java    |   82 -
 .../testutils/ZooKeeperStringSerializer.java    |   51 -
 .../src/test/resources/log4j-test.properties    |   29 -
 .../src/test/resources/logback-test.xml         |   30 -
 .../flink-connector-kinesis/pom.xml             |  164 --
 .../kinesis/FlinkKinesisConsumer.java           |  304 ---
 .../kinesis/FlinkKinesisProducer.java           |  292 ---
 .../connectors/kinesis/KinesisPartitioner.java  |   49 -
 .../kinesis/config/AWSConfigConstants.java      |   70 -
 .../kinesis/config/ConsumerConfigConstants.java |  138 --
 .../kinesis/config/ProducerConfigConstants.java |   33 -
 .../kinesis/examples/ConsumeFromKinesis.java    |   54 -
 .../kinesis/examples/ProduceIntoKinesis.java    |   77 -
 .../kinesis/internals/KinesisDataFetcher.java   |  679 ------
 .../kinesis/internals/ShardConsumer.java        |  287 ---
 .../kinesis/model/KinesisStreamShard.java       |  133 --
 .../kinesis/model/KinesisStreamShardState.java  |   71 -
 .../kinesis/model/SentinelSequenceNumber.java   |   51 -
 .../kinesis/model/SequenceNumber.java           |  104 -
 .../kinesis/proxy/GetShardListResult.java       |   75 -
 .../connectors/kinesis/proxy/KinesisProxy.java  |  338 ---
 .../kinesis/proxy/KinesisProxyInterface.java    |   69 -
 .../KinesisDeserializationSchema.java           |   57 -
 .../KinesisDeserializationSchemaWrapper.java    |   57 -
 .../KinesisSerializationSchema.java             |   45 -
 .../connectors/kinesis/util/AWSUtil.java        |  130 --
 .../kinesis/util/KinesisConfigUtil.java         |  218 --
 .../src/main/resources/log4j.properties         |   27 -
 .../kinesis/FlinkKinesisConsumerTest.java       |  472 -----
 .../internals/KinesisDataFetcherTest.java       |  510 -----
 .../kinesis/internals/ShardConsumerTest.java    |  122 --
 .../manualtests/ManualConsumerProducerTest.java |  121 --
 .../manualtests/ManualExactlyOnceTest.java      |  147 --
 ...nualExactlyOnceWithStreamReshardingTest.java |  247 ---
 .../kinesis/manualtests/ManualProducerTest.java |   91 -
 .../ExactlyOnceValidatingConsumerThread.java    |  155 --
 .../testutils/FakeKinesisBehavioursFactory.java |  262 ---
 .../KinesisEventsGeneratorProducerThread.java   |  118 --
 .../testutils/KinesisShardIdGenerator.java      |   25 -
 .../testutils/TestableFlinkKinesisConsumer.java |   60 -
 .../testutils/TestableKinesisDataFetcher.java   |  122 --
 .../flink-connector-nifi/pom.xml                |   89 -
 .../connectors/nifi/NiFiDataPacket.java         |   39 -
 .../connectors/nifi/NiFiDataPacketBuilder.java  |   34 -
 .../streaming/connectors/nifi/NiFiSink.java     |   74 -
 .../streaming/connectors/nifi/NiFiSource.java   |  155 --
 .../connectors/nifi/StandardNiFiDataPacket.java |   46 -
 .../nifi/examples/NiFiSinkTopologyExample.java  |   55 -
 .../examples/NiFiSourceTopologyExample.java     |   58 -
 .../src/test/resources/NiFi_Flink.xml           |   16 -
 .../flink-connector-rabbitmq/pom.xml            |   60 -
 .../streaming/connectors/rabbitmq/RMQSink.java  |  142 --
 .../connectors/rabbitmq/RMQSource.java          |  243 ---
 .../rabbitmq/common/RMQConnectionConfig.java    |  448 ----
 .../connectors/rabbitmq/RMQSourceTest.java      |  419 ----
 .../common/RMQConnectionConfigTest.java         |   69 -
 .../connectors/rabbitmq/common/RMQSinkTest.java |  125 --
 .../flink-connector-redis/pom.xml               |   79 -
 .../streaming/connectors/redis/RedisSink.java   |  188 --
 .../common/config/FlinkJedisClusterConfig.java  |  187 --
 .../common/config/FlinkJedisConfigBase.java     |   90 -
 .../common/config/FlinkJedisPoolConfig.java     |  224 --
 .../common/config/FlinkJedisSentinelConfig.java |  259 ---
 .../common/container/RedisClusterContainer.java |  171 --
 .../container/RedisCommandsContainer.java       |  115 -
 .../RedisCommandsContainerBuilder.java          |  116 -
 .../redis/common/container/RedisContainer.java  |  252 ---
 .../redis/common/mapper/RedisCommand.java       |   86 -
 .../common/mapper/RedisCommandDescription.java  |   94 -
 .../redis/common/mapper/RedisDataType.java      |   66 -
 .../redis/common/mapper/RedisMapper.java        |   66 -
 .../connectors/redis/RedisITCaseBase.java       |   45 -
 .../redis/RedisSentinelClusterTest.java         |  100 -
 .../connectors/redis/RedisSinkITCase.java       |  233 --
 .../redis/RedisSinkPublishITCase.java           |  137 --
 .../connectors/redis/RedisSinkTest.java         |  144 --
 .../common/config/FlinkJedisConfigBaseTest.java |   50 -
 .../common/config/JedisClusterConfigTest.java   |   49 -
 .../common/config/JedisPoolConfigTest.java      |   29 -
 .../common/config/JedisSentinelConfigTest.java  |   49 -
 .../mapper/RedisDataTypeDescriptionTest.java    |   41 -
 .../flink-connector-twitter/pom.xml             |   96 -
 .../connectors/twitter/TwitterSource.java       |  217 --
 flink-streaming-connectors/pom.xml              |   70 -
 pom.xml                                         |    9 +-
 687 files changed, 53344 insertions(+), 53385 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index e18629d..6cbccc1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,7 +17,7 @@ tmp
 *.log
 .DS_Store
 build-target
-flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/
+flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/
 flink-runtime-web/web-dashboard/assets/fonts/
 flink-runtime-web/web-dashboard/node_modules/
 flink-runtime-web/web-dashboard/bower_components/

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/pom.xml 
b/flink-batch-connectors/flink-avro/pom.xml
deleted file mode 100644
index 1161173..0000000
--- a/flink-batch-connectors/flink-avro/pom.xml
+++ /dev/null
@@ -1,216 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-       
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-batch-connectors</artifactId>
-               <version>1.2-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-avro_2.10</artifactId>
-       <name>flink-avro</name>
-
-       <packaging>jar</packaging>
-
-       <dependencies>
-
-               <!-- core dependencies -->
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-java</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.avro</groupId>
-                       <artifactId>avro</artifactId>
-                       <!-- version is derived from base module -->
-               </dependency>
-
-               <!-- test dependencies -->
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-test-utils-junit</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-test-utils_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-clients_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-                       <type>test-jar</type>
-               </dependency>
-       </dependencies>
-
-       <build>
-               <plugins>
-                       <plugin>
-                               <artifactId>maven-assembly-plugin</artifactId>
-                               <executions>
-                                       <execution>
-                                               <id>create-test-dependency</id>
-                                               
<phase>process-test-classes</phase>
-                                               <goals>
-                                                       <goal>single</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <archive>
-                                                               <manifest>
-                                                                       
<mainClass>org.apache.flink.api.avro.testjar.AvroExternalJarProgram</mainClass>
-                                                               </manifest>
-                                                       </archive>
-                                                       
<finalName>maven</finalName>
-                                                       <attach>false</attach>
-                                                       <descriptors>
-                                                               
<descriptor>src/test/assembly/test-assembly.xml</descriptor>
-                                                       </descriptors>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-                       <!--Remove the AvroExternalJarProgram code from the 
test-classes directory since it musn't be in the
-                       classpath when running the tests to actually test 
whether the user code class loader
-                       is properly used.-->
-                       <plugin>
-                               <artifactId>maven-clean-plugin</artifactId>
-                               <version>2.5</version><!--$NO-MVN-MAN-VER$-->
-                               <executions>
-                                       <execution>
-                                               
<id>remove-avroexternalprogram</id>
-                                               
<phase>process-test-classes</phase>
-                                               <goals>
-                                                       <goal>clean</goal>
-                                               </goals>
-                                               <configuration>
-                                                       
<excludeDefaultDirectories>true</excludeDefaultDirectories>
-                                                       <filesets>
-                                                               <fileset>
-                                                                       
<directory>${project.build.testOutputDirectory}</directory>
-                                                                       
<includes>
-                                                                               
<include>**/testjar/*.class</include>
-                                                                       
</includes>
-                                                               </fileset>
-                                                       </filesets>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-                       <!-- Generate Test class from avro schema -->
-                       <plugin>
-                               <groupId>org.apache.avro</groupId>
-                               <artifactId>avro-maven-plugin</artifactId>
-                               <version>1.7.7</version>
-                               <executions>
-                                       <execution>
-                                               <phase>generate-sources</phase>
-                                               <goals>
-                                                       <goal>schema</goal>
-                                               </goals>
-                                               <configuration>
-                                                       
<testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
-                                                       
<testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-               </plugins>
-               
-               <pluginManagement>
-                       <plugins>
-                               <!--This plugin's configuration is used to 
store Eclipse m2e settings only. It has no influence on the Maven build 
itself.-->
-                               <plugin>
-                                       <groupId>org.eclipse.m2e</groupId>
-                                       
<artifactId>lifecycle-mapping</artifactId>
-                                       <version>1.0.0</version>
-                                       <configuration>
-                                               <lifecycleMappingMetadata>
-                                                       <pluginExecutions>
-                                                               
<pluginExecution>
-                                                                       
<pluginExecutionFilter>
-                                                                               
<groupId>org.apache.maven.plugins</groupId>
-                                                                               
<artifactId>maven-assembly-plugin</artifactId>
-                                                                               
<versionRange>[2.4,)</versionRange>
-                                                                               
<goals>
-                                                                               
        <goal>single</goal>
-                                                                               
</goals>
-                                                                       
</pluginExecutionFilter>
-                                                                       <action>
-                                                                               
<ignore/>
-                                                                       
</action>
-                                                               
</pluginExecution>
-                                                               
<pluginExecution>
-                                                                       
<pluginExecutionFilter>
-                                                                               
<groupId>org.apache.maven.plugins</groupId>
-                                                                               
<artifactId>maven-clean-plugin</artifactId>
-                                                                               
<versionRange>[1,)</versionRange>
-                                                                               
<goals>
-                                                                               
        <goal>clean</goal>
-                                                                               
</goals>
-                                                                       
</pluginExecutionFilter>
-                                                                       <action>
-                                                                               
<ignore/>
-                                                                       
</action>
-                                                               
</pluginExecution>
-                                                               
<pluginExecution>
-                                                                       
<pluginExecutionFilter>
-                                                                               
<groupId>org.apache.avro</groupId>
-                                                                               
<artifactId>avro-maven-plugin</artifactId>
-                                                                               
<versionRange>[1.7.7,)</versionRange>
-                                                                               
<goals>
-                                                                               
        <goal>schema</goal>
-                                                                               
</goals>
-                                                                       
</pluginExecutionFilter>
-                                                                       <action>
-                                                                               
<ignore/>
-                                                                       
</action>
-                                                               
</pluginExecution>
-                                                       </pluginExecutions>
-                                               </lifecycleMappingMetadata>
-                                       </configuration>
-                               </plugin>
-                       </plugins>
-               </pluginManagement>
-       </build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
 
b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
deleted file mode 100644
index 59da4cb..0000000
--- 
a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.avro.io.Decoder;
-import org.apache.avro.util.Utf8;
-
-
-public class DataInputDecoder extends Decoder {
-       
-       private final Utf8 stringDecoder = new Utf8();
-       
-       private DataInput in;
-       
-       public void setIn(DataInput in) {
-               this.in = in;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // primitives
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public void readNull() {}
-       
-
-       @Override
-       public boolean readBoolean() throws IOException {
-               return in.readBoolean();
-       }
-
-       @Override
-       public int readInt() throws IOException {
-               return in.readInt();
-       }
-
-       @Override
-       public long readLong() throws IOException {
-               return in.readLong();
-       }
-
-       @Override
-       public float readFloat() throws IOException {
-               return in.readFloat();
-       }
-
-       @Override
-       public double readDouble() throws IOException {
-               return in.readDouble();
-       }
-       
-       @Override
-       public int readEnum() throws IOException {
-               return readInt();
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // bytes
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void readFixed(byte[] bytes, int start, int length) throws 
IOException {
-               in.readFully(bytes, start, length);
-       }
-       
-       @Override
-       public ByteBuffer readBytes(ByteBuffer old) throws IOException {
-               int length = readInt();
-               ByteBuffer result;
-               if (old != null && length <= old.capacity() && old.hasArray()) {
-                       result = old;
-                       result.clear();
-               } else {
-                       result = ByteBuffer.allocate(length);
-               }
-               in.readFully(result.array(), result.arrayOffset() + 
result.position(), length);
-               result.limit(length);
-               return result;
-       }
-       
-       
-       @Override
-       public void skipFixed(int length) throws IOException {
-               skipBytes(length);
-       }
-       
-       @Override
-       public void skipBytes() throws IOException {
-               int num = readInt();
-               skipBytes(num);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // strings
-       // 
--------------------------------------------------------------------------------------------
-       
-       
-       @Override
-       public Utf8 readString(Utf8 old) throws IOException {
-               int length = readInt();
-               Utf8 result = (old != null ? old : new Utf8());
-               result.setByteLength(length);
-               
-               if (length > 0) {
-                       in.readFully(result.getBytes(), 0, length);
-               }
-               
-               return result;
-       }
-
-       @Override
-       public String readString() throws IOException {
-               return readString(stringDecoder).toString();
-       }
-
-       @Override
-       public void skipString() throws IOException {
-               int len = readInt();
-               skipBytes(len);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // collection types
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public long readArrayStart() throws IOException {
-               return readVarLongCount(in);
-       }
-
-       @Override
-       public long arrayNext() throws IOException {
-               return readVarLongCount(in);
-       }
-
-       @Override
-       public long skipArray() throws IOException {
-               return readVarLongCount(in);
-       }
-
-       @Override
-       public long readMapStart() throws IOException {
-               return readVarLongCount(in);
-       }
-
-       @Override
-       public long mapNext() throws IOException {
-               return readVarLongCount(in);
-       }
-
-       @Override
-       public long skipMap() throws IOException {
-               return readVarLongCount(in);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // union
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public int readIndex() throws IOException {
-               return readInt();
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // utils
-       // 
--------------------------------------------------------------------------------------------
-       
-       private void skipBytes(int num) throws IOException {
-               while (num > 0) {
-                       num -= in.skipBytes(num);
-               }
-       }
-       
-       public static long readVarLongCount(DataInput in) throws IOException {
-               long value = in.readUnsignedByte();
-
-               if ((value & 0x80) == 0) {
-                       return value;
-               }
-               else {
-                       long curr;
-                       int shift = 7;
-                       value = value & 0x7f;
-                       while (((curr = in.readUnsignedByte()) & 0x80) != 0){
-                               value |= (curr & 0x7f) << shift;
-                               shift += 7;
-                       }
-                       value |= curr << shift;
-                       return value;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
 
b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
deleted file mode 100644
index 0102cc1..0000000
--- 
a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.avro.io.Encoder;
-import org.apache.avro.util.Utf8;
-
-
-public final class DataOutputEncoder extends Encoder implements 
java.io.Serializable {
-       
-       private static final long serialVersionUID = 1L;
-       
-       private DataOutput out;
-       
-       
-       public void setOut(DataOutput out) {
-               this.out = out;
-       }
-
-
-       @Override
-       public void flush() throws IOException {}
-
-       // 
--------------------------------------------------------------------------------------------
-       // primitives
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public void writeNull() {}
-       
-
-       @Override
-       public void writeBoolean(boolean b) throws IOException {
-               out.writeBoolean(b);
-       }
-
-       @Override
-       public void writeInt(int n) throws IOException {
-               out.writeInt(n);
-       }
-
-       @Override
-       public void writeLong(long n) throws IOException {
-               out.writeLong(n);
-       }
-
-       @Override
-       public void writeFloat(float f) throws IOException {
-               out.writeFloat(f);
-       }
-
-       @Override
-       public void writeDouble(double d) throws IOException {
-               out.writeDouble(d);
-       }
-       
-       @Override
-       public void writeEnum(int e) throws IOException {
-               out.writeInt(e);
-       }
-       
-       
-       // 
--------------------------------------------------------------------------------------------
-       // bytes
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void writeFixed(byte[] bytes, int start, int len) throws 
IOException {
-               out.write(bytes, start, len);
-       }
-       
-       @Override
-       public void writeBytes(byte[] bytes, int start, int len) throws 
IOException {
-               out.writeInt(len);
-               if (len > 0) {
-                       out.write(bytes, start, len);
-               }
-       }
-       
-       @Override
-       public void writeBytes(ByteBuffer bytes) throws IOException {
-               int num = bytes.remaining();
-               out.writeInt(num);
-               
-               if (num > 0) {
-                       writeFixed(bytes);
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // strings
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void writeString(String str) throws IOException {
-               byte[] bytes = Utf8.getBytesFor(str);
-               writeBytes(bytes, 0, bytes.length);
-       }
-       
-       @Override
-       public void writeString(Utf8 utf8) throws IOException {
-               writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
-               
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // collection types
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void writeArrayStart() {}
-
-       @Override
-       public void setItemCount(long itemCount) throws IOException {
-               if (itemCount > 0) {
-                       writeVarLongCount(out, itemCount);
-               }
-       }
-
-       @Override
-       public void startItem() {}
-
-       @Override
-       public void writeArrayEnd() throws IOException {
-               // write a single byte 0, shortcut for a var-length long of 0
-               out.write(0);
-       }
-
-       @Override
-       public void writeMapStart() {}
-
-       @Override
-       public void writeMapEnd() throws IOException {
-               // write a single byte 0, shortcut for a var-length long of 0
-               out.write(0);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // union
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public void writeIndex(int unionIndex) throws IOException {
-               out.writeInt(unionIndex);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // utils
-       // 
--------------------------------------------------------------------------------------------
-               
-       
-       public static void writeVarLongCount(DataOutput out, long val) throws 
IOException {
-               if (val < 0) {
-                       throw new IOException("Illegal count (must be 
non-negative): " + val);
-               }
-               
-               while ((val & ~0x7FL) != 0) {
-                       out.write(((int) val) | 0x80);
-                       val >>>= 7;
-               }
-               out.write((int) val);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
 
b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
deleted file mode 100644
index 709c4f1..0000000
--- 
a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.avro;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.avro.file.SeekableInput;
-import org.apache.flink.core.fs.FSDataInputStream;
-
-
-/**
- * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache 
licensed as well)
- * 
- * The wrapper keeps track of the position in the data stream.
- */
-public class FSDataInputStreamWrapper implements Closeable, SeekableInput {
-       private final FSDataInputStream stream;
-       private long pos;
-       private long len;
-
-       public FSDataInputStreamWrapper(FSDataInputStream stream, long len) {
-               this.stream = stream;
-               this.pos = 0;
-               this.len = len;
-       }
-
-       public long length() throws IOException {
-               return this.len;
-       }
-
-       public int read(byte[] b, int off, int len) throws IOException {
-               int read;
-               read = stream.read(b, off, len);
-               pos += read;
-               return read;
-       }
-
-       public void seek(long p) throws IOException {
-               stream.seek(p);
-               pos = p;
-       }
-
-       public long tell() throws IOException {
-               return pos;
-       }
-
-       public void close() throws IOException {
-               stream.close();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
 
b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
deleted file mode 100644
index 73067c1..0000000
--- 
a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.io;
-
-import java.io.IOException;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.flink.api.common.io.CheckpointableInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.avro.FSDataInputStreamWrapper;
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.InstantiationUtil;
-
-/**
- * Provides a {@link FileInputFormat} for Avro records.
- *
- * @param <E>
- *            the type of the result Avro record. If you specify
- *            {@link GenericRecord} then the result will be returned as a
- *            {@link GenericRecord}, so you do not have to know the schema 
ahead
- *            of time.
- */
-public class AvroInputFormat<E> extends FileInputFormat<E> implements 
ResultTypeQueryable<E>,
-       CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
-
-       private static final long serialVersionUID = 1L;
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(AvroInputFormat.class);
-
-       private final Class<E> avroValueType;
-       
-       private boolean reuseAvroValue = true;
-
-       private transient DataFileReader<E> dataFileReader;
-
-       private transient long end;
-
-       private transient long recordsReadSinceLastSync;
-
-       private long lastSync = -1l;
-
-       public AvroInputFormat(Path filePath, Class<E> type) {
-               super(filePath);
-               this.avroValueType = type;
-       }
-
-       /**
-        * Sets the flag whether to reuse the Avro value instance for all 
records.
-        * By default, the input format reuses the Avro value.
-        *
-        * @param reuseAvroValue True, if the input format should reuse the 
Avro value instance, false otherwise.
-        */
-       public void setReuseAvroValue(boolean reuseAvroValue) {
-               this.reuseAvroValue = reuseAvroValue;
-       }
-
-       /**
-        * If set, the InputFormat will only read entire files.
-        */
-       public void setUnsplittable(boolean unsplittable) {
-               this.unsplittable = unsplittable;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // Typing
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public TypeInformation<E> getProducedType() {
-               return TypeExtractor.getForClass(this.avroValueType);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // Input Format Methods
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void open(FileInputSplit split) throws IOException {
-               super.open(split);
-               dataFileReader = initReader(split);
-               dataFileReader.sync(split.getStart());
-               lastSync = dataFileReader.previousSync();
-       }
-
-       private DataFileReader<E> initReader(FileInputSplit split) throws 
IOException {
-               DatumReader<E> datumReader;
-
-               if (org.apache.avro.generic.GenericRecord.class == 
avroValueType) {
-                       datumReader = new GenericDatumReader<E>();
-               } else {
-                       datumReader = 
org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
-                               ? new SpecificDatumReader<E>(avroValueType) : 
new ReflectDatumReader<E>(avroValueType);
-               }
-               if (LOG.isInfoEnabled()) {
-                       LOG.info("Opening split {}", split);
-               }
-
-               SeekableInput in = new FSDataInputStreamWrapper(stream, 
split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
-               DataFileReader<E> dataFileReader = (DataFileReader) 
DataFileReader.openReader(in, datumReader);
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Loaded SCHEMA: {}", 
dataFileReader.getSchema());
-               }
-
-               end = split.getStart() + split.getLength();
-               recordsReadSinceLastSync = 0;
-               return dataFileReader;
-       }
-
-       @Override
-       public boolean reachedEnd() throws IOException {
-               return !dataFileReader.hasNext() || 
dataFileReader.pastSync(end);
-       }
-
-       public long getRecordsReadFromBlock() {
-               return this.recordsReadSinceLastSync;
-       }
-
-       @Override
-       public E nextRecord(E reuseValue) throws IOException {
-               if (reachedEnd()) {
-                       return null;
-               }
-
-               // if we start a new block, then register the event, and
-               // restart the counter.
-               if(dataFileReader.previousSync() != lastSync) {
-                       lastSync = dataFileReader.previousSync();
-                       recordsReadSinceLastSync = 0;
-               }
-               recordsReadSinceLastSync++;
-
-               if (reuseAvroValue) {
-                       return dataFileReader.next(reuseValue);
-               } else {
-                       if (GenericRecord.class == avroValueType) {
-                               return dataFileReader.next();
-                       } else {
-                               return 
dataFileReader.next(InstantiationUtil.instantiate(avroValueType, Object.class));
-                       }
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Checkpointing
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public Tuple2<Long, Long> getCurrentState() throws IOException {
-               return new Tuple2<>(this.lastSync, 
this.recordsReadSinceLastSync);
-       }
-
-       @Override
-       public void reopen(FileInputSplit split, Tuple2<Long, Long> state) 
throws IOException {
-               Preconditions.checkNotNull(split, "reopen() cannot be called on 
a null split.");
-               Preconditions.checkNotNull(state, "reopen() cannot be called 
with a null initial state.");
-
-               try {
-                       this.open(split);
-               } finally {
-                       if (state.f0 != -1) {
-                               lastSync = state.f0;
-                               recordsReadSinceLastSync = state.f1;
-                       }
-               }
-
-               if (lastSync != -1) {
-                       // open and read until the record we were before
-                       // the checkpoint and discard the values
-                       dataFileReader.seek(lastSync);
-                       for(int i = 0; i < recordsReadSinceLastSync; i++) {
-                               dataFileReader.next(null);
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
 
b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
deleted file mode 100644
index 600d1e5..0000000
--- 
a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.io;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.CodecFactory;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.core.fs.Path;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-public class AvroOutputFormat<E> extends FileOutputFormat<E> implements 
Serializable {
-
-       /**
-        * Wrapper which encapsulates the supported codec and a related 
serialization byte.
-        */
-       public enum Codec {
-
-               NULL((byte)0, CodecFactory.nullCodec()),
-               SNAPPY((byte)1, CodecFactory.snappyCodec()),
-               BZIP2((byte)2, CodecFactory.bzip2Codec()),
-               DEFLATE((byte)3, 
CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)),
-               XZ((byte)4, 
CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL));
-
-               private byte codecByte;
-
-               private CodecFactory codecFactory;
-
-               Codec(final byte codecByte, final CodecFactory codecFactory) {
-                       this.codecByte = codecByte;
-                       this.codecFactory = codecFactory;
-               }
-
-               private byte getCodecByte() {
-                       return codecByte;
-               }
-
-               private CodecFactory getCodecFactory() {
-                       return codecFactory;
-               }
-
-               private static Codec forCodecByte(byte codecByte) {
-                       for (final Codec codec : Codec.values()) {
-                               if (codec.getCodecByte() == codecByte) {
-                                       return codec;
-                               }
-                       }
-                       throw new IllegalArgumentException("no codec for 
codecByte: " + codecByte);
-               }
-       }
-
-       private static final long serialVersionUID = 1L;
-
-       private final Class<E> avroValueType;
-
-       private transient Schema userDefinedSchema = null;
-
-       private transient Codec codec = null;
-       
-       private transient DataFileWriter<E> dataFileWriter;
-
-       public AvroOutputFormat(Path filePath, Class<E> type) {
-               super(filePath);
-               this.avroValueType = type;
-       }
-
-       public AvroOutputFormat(Class<E> type) {
-               this.avroValueType = type;
-       }
-
-       @Override
-       protected String getDirectoryFileName(int taskNumber) {
-               return super.getDirectoryFileName(taskNumber) + ".avro";
-       }
-
-       public void setSchema(Schema schema) {
-               this.userDefinedSchema = schema;
-       }
-
-       /**
-        * Set avro codec for compression.
-        *
-        * @param codec avro codec.
-        */
-       public void setCodec(final Codec codec) {
-               this.codec = checkNotNull(codec, "codec can not be null");
-       }
-
-       @Override
-       public void writeRecord(E record) throws IOException {
-               dataFileWriter.append(record);
-       }
-
-       @Override
-       public void open(int taskNumber, int numTasks) throws IOException {
-               super.open(taskNumber, numTasks);
-
-               DatumWriter<E> datumWriter;
-               Schema schema;
-               if 
(org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType))
 {
-                       datumWriter = new SpecificDatumWriter<E>(avroValueType);
-                       try {
-                               schema = 
((org.apache.avro.specific.SpecificRecordBase)avroValueType.newInstance()).getSchema();
-                       } catch (InstantiationException e) {
-                               throw new RuntimeException(e.getMessage());
-                       } catch (IllegalAccessException e) {
-                               throw new RuntimeException(e.getMessage());
-                       }
-               } else {
-                       datumWriter = new ReflectDatumWriter<E>(avroValueType);
-                       schema = ReflectData.get().getSchema(avroValueType);
-               }
-               dataFileWriter = new DataFileWriter<E>(datumWriter);
-               if (codec != null) {
-                       dataFileWriter.setCodec(codec.getCodecFactory());
-               }
-               if (userDefinedSchema == null) {
-                       dataFileWriter.create(schema, stream);
-               } else {
-                       dataFileWriter.create(userDefinedSchema, stream);
-               }
-       }
-
-       private void writeObject(java.io.ObjectOutputStream out) throws 
IOException {
-               out.defaultWriteObject();
-
-               if (codec != null) {
-                       out.writeByte(codec.getCodecByte());
-               } else {
-                       out.writeByte(-1);
-               }
-
-               if(userDefinedSchema != null) {
-                       byte[] json = userDefinedSchema.toString().getBytes();
-                       out.writeInt(json.length);
-                       out.write(json);
-               } else {
-                       out.writeInt(0);
-               }
-       }
-
-       private void readObject(java.io.ObjectInputStream in) throws 
IOException, ClassNotFoundException {
-               in.defaultReadObject();
-
-               byte codecByte = in.readByte();
-               if (codecByte >= 0) {
-                       setCodec(Codec.forCodecByte(codecByte));
-               }
-
-               int length = in.readInt();
-               if(length != 0) {
-                       byte[] json = new byte[length];
-                       in.readFully(json);
-
-                       Schema schema = new Schema.Parser().parse(new 
String(json));
-                       setSchema(schema);
-               }
-       }
-
-       @Override
-       public void close() throws IOException {
-               dataFileWriter.flush();
-               dataFileWriter.close();
-               super.close();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/assembly/test-assembly.xml 
b/flink-batch-connectors/flink-avro/src/test/assembly/test-assembly.xml
deleted file mode 100644
index 0f4561a..0000000
--- a/flink-batch-connectors/flink-avro/src/test/assembly/test-assembly.xml
+++ /dev/null
@@ -1,36 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<assembly>
-       <id>test-jar</id>
-       <formats>
-               <format>jar</format>
-       </formats>
-       <includeBaseDirectory>false</includeBaseDirectory>
-       <fileSets>
-               <fileSet>
-                       
<directory>${project.build.testOutputDirectory}</directory>
-                       <outputDirectory>/</outputDirectory>
-                       <!--modify/add include to match your package(s) -->
-                       <includes>
-                               
<include>org/apache/flink/api/avro/testjar/**</include>
-                       </includes>
-               </fileSet>
-       </fileSets>
-</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
deleted file mode 100644
index 1030ff8..0000000
--- 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.File;
-
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class AvroExternalJarProgramITCase {
-
-       private static final String JAR_FILE = "maven-test-jar.jar";
-
-       private static final String TEST_DATA_FILE = "/testdata.avro";
-
-       @Test
-       public void testExternalProgram() {
-
-               LocalFlinkMiniCluster testMiniCluster = null;
-
-               try {
-                       Configuration config = new Configuration();
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
-                       testMiniCluster = new LocalFlinkMiniCluster(config, 
false);
-                       testMiniCluster.start();
-
-                       String jarFile = JAR_FILE;
-                       String testData = 
getClass().getResource(TEST_DATA_FILE).toString();
-
-                       PackagedProgram program = new PackagedProgram(new 
File(jarFile), new String[] { testData });
-
-
-                       
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-                       
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
testMiniCluster.getLeaderRPCPort());
-
-                       ClusterClient client = new 
StandaloneClusterClient(config);
-
-                       client.setPrintStatusDuringExecution(false);
-                       client.run(program, 4);
-
-               }
-               catch (Throwable t) {
-                       System.err.println(t.getMessage());
-                       t.printStackTrace();
-                       Assert.fail("Error during the packaged program 
execution: " + t.getMessage());
-               }
-               finally {
-                       if (testMiniCluster != null) {
-                               try {
-                                       testMiniCluster.stop();
-                               } catch (Throwable t) {
-                                       // ignore
-                               }
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
deleted file mode 100644
index 3b01ccb..0000000
--- 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import org.junit.Assert;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.flink.api.io.avro.example.User;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-@SuppressWarnings("serial")
-public class AvroOutputFormatITCase extends JavaProgramTestBase {
-
-       public static String outputPath1;
-
-       public static String outputPath2;
-
-       public static String inputPath;
-
-       public static String userData = "alice|1|blue\n" +
-               "bob|2|red\n" +
-               "john|3|yellow\n" +
-               "walt|4|black\n";
-
-       @Override
-       protected void preSubmit() throws Exception {
-               inputPath = createTempFile("user", userData);
-               outputPath1 = getTempDirPath("avro_output1");
-               outputPath2 = getTempDirPath("avro_output2");
-       }
-
-
-       @Override
-       protected void testProgram() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Tuple3<String, Integer, String>> input = 
env.readCsvFile(inputPath)
-                       .fieldDelimiter("|")
-                       .types(String.class, Integer.class, String.class);
-
-               //output the data with AvroOutputFormat for specific user type
-               DataSet<User> specificUser = input.map(new ConvertToUser());
-               AvroOutputFormat<User> avroOutputFormat = new 
AvroOutputFormat<User>(User.class);
-               avroOutputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY); // 
FLINK-4771: use a codec
-               avroOutputFormat.setSchema(User.SCHEMA$); //FLINK-3304: Ensure 
the OF is properly serializing the schema
-               specificUser.write(avroOutputFormat, outputPath1);
-
-               //output the data with AvroOutputFormat for reflect user type
-               DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new 
ConvertToReflective());
-               reflectiveUser.write(new 
AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2);
-
-               env.execute();
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               //compare result for specific user type
-               File [] output1;
-               File file1 = asFile(outputPath1);
-               if (file1.isDirectory()) {
-                       output1 = file1.listFiles();
-                       // check for avro ext in dir.
-                       for (File avroOutput : output1) {
-                               Assert.assertTrue("Expect extension '.avro'", 
avroOutput.toString().endsWith(".avro"));
-                       }
-               } else {
-                       output1 = new File[] {file1};
-               }
-               List<String> result1 = new ArrayList<String>();
-               DatumReader<User> userDatumReader1 = new 
SpecificDatumReader<User>(User.class);
-               for (File avroOutput : output1) {
-
-                       DataFileReader<User> dataFileReader1 = new 
DataFileReader<User>(avroOutput, userDatumReader1);
-                       while (dataFileReader1.hasNext()) {
-                               User user = dataFileReader1.next();
-                               result1.add(user.getName() + "|" + 
user.getFavoriteNumber() + "|" + user.getFavoriteColor());
-                       }
-               }
-               for (String expectedResult : userData.split("\n")) {
-                       Assert.assertTrue("expected user " + expectedResult + " 
not found.", result1.contains(expectedResult));
-               }
-
-               //compare result for reflect user type
-               File [] output2;
-               File file2 = asFile(outputPath2);
-               if (file2.isDirectory()) {
-                       output2 = file2.listFiles();
-               } else {
-                       output2 = new File[] {file2};
-               }
-               List<String> result2 = new ArrayList<String>();
-               DatumReader<ReflectiveUser> userDatumReader2 = new 
ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class);
-               for (File avroOutput : output2) {
-                       DataFileReader<ReflectiveUser> dataFileReader2 = new 
DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2);
-                       while (dataFileReader2.hasNext()) {
-                               ReflectiveUser user = dataFileReader2.next();
-                               result2.add(user.getName() + "|" + 
user.getFavoriteNumber() + "|" + user.getFavoriteColor());
-                       }
-               }
-               for (String expectedResult : userData.split("\n")) {
-                       Assert.assertTrue("expected user " + expectedResult + " 
not found.", result2.contains(expectedResult));
-               }
-
-
-       }
-
-
-       public final static class ConvertToUser extends 
RichMapFunction<Tuple3<String, Integer, String>, User> {
-
-               @Override
-               public User map(Tuple3<String, Integer, String> value) throws 
Exception {
-                       return new User(value.f0, value.f1, value.f2);
-               }
-       }
-
-       public final static class ConvertToReflective extends 
RichMapFunction<User, ReflectiveUser> {
-
-               @Override
-               public ReflectiveUser map(User value) throws Exception {
-                       return new ReflectiveUser(value.getName().toString(), 
value.getFavoriteNumber(), value.getFavoriteColor().toString());
-               }
-       }
-
-       
-       public static class ReflectiveUser {
-               private String name;
-               private int favoriteNumber;
-               private String favoriteColor;
-
-               public ReflectiveUser() {}
-
-               public ReflectiveUser(String name, int favoriteNumber, String 
favoriteColor) {
-                       this.name = name;
-                       this.favoriteNumber = favoriteNumber;
-                       this.favoriteColor = favoriteColor;
-               }
-               
-               public String getName() {
-                       return this.name;
-               }
-               public String getFavoriteColor() {
-                       return this.favoriteColor;
-               }
-               public int getFavoriteNumber() {
-                       return this.favoriteNumber;
-               }
-       }
-}

Reply via email to