KAFKA-2774: Rename Copycat to Kafka Connect Author: Ewen Cheslack-Postava <m...@ewencp.org>
Reviewers: Gwen Shapira Closes #456 from ewencp/kafka-2774-rename-copycat (cherry picked from commit f2031d40639ef34c1591c22971394ef41c87652c) Signed-off-by: Gwen Shapira <csh...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/417e283d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/417e283d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/417e283d Branch: refs/heads/0.9.0 Commit: 417e283d643d8865aa3e79dffa373c8cc853d78f Parents: 4801322 Author: Ewen Cheslack-Postava <m...@ewencp.org> Authored: Sun Nov 8 22:11:03 2015 -0800 Committer: Gwen Shapira <csh...@gmail.com> Committed: Sun Nov 8 22:11:26 2015 -0800 ---------------------------------------------------------------------- bin/connect-distributed.sh | 23 + bin/connect-standalone.sh | 23 + bin/copycat-distributed.sh | 23 - bin/copycat-standalone.sh | 23 - bin/kafka-run-class.sh | 6 +- build.gradle | 48 +- checkstyle/import-control.xml | 30 +- .../consumer/internals/AbstractCoordinator.java | 2 +- config/connect-console-sink.properties | 19 + config/connect-console-source.properties | 19 + config/connect-distributed.properties | 42 + config/connect-file-sink.properties | 20 + config/connect-file-source.properties | 20 + config/connect-log4j.properties | 23 + config/connect-standalone.properties | 37 + config/copycat-console-sink.properties | 19 - config/copycat-console-source.properties | 19 - config/copycat-distributed.properties | 42 - config/copycat-file-sink.properties | 20 - config/copycat-file-source.properties | 20 - config/copycat-log4j.properties | 23 - config/copycat-standalone.properties | 37 - .../kafka/connect/connector/ConnectRecord.java | 122 +++ .../kafka/connect/connector/Connector.java | 124 +++ .../connect/connector/ConnectorContext.java | 33 + .../apache/kafka/connect/connector/Task.java | 56 ++ .../kafka/connect/data/ConnectSchema.java | 323 +++++++ .../org/apache/kafka/connect/data/Date.java | 76 ++ .../org/apache/kafka/connect/data/Decimal.java | 87 ++ .../org/apache/kafka/connect/data/Field.java | 77 ++ .../org/apache/kafka/connect/data/Schema.java | 163 ++++ .../kafka/connect/data/SchemaAndValue.java | 62 ++ .../kafka/connect/data/SchemaBuilder.java | 412 +++++++++ .../kafka/connect/data/SchemaProjector.java | 197 ++++ .../org/apache/kafka/connect/data/Struct.java | 265 ++++++ .../org/apache/kafka/connect/data/Time.java | 77 ++ .../apache/kafka/connect/data/Timestamp.java | 64 ++ .../kafka/connect/errors/ConnectException.java | 40 + .../kafka/connect/errors/DataException.java | 35 + .../errors/IllegalWorkerStateException.java | 35 + .../connect/errors/SchemaBuilderException.java | 32 + .../errors/SchemaProjectorException.java | 29 + .../kafka/connect/sink/SinkConnector.java | 40 + .../apache/kafka/connect/sink/SinkRecord.java | 72 ++ .../org/apache/kafka/connect/sink/SinkTask.java | 107 +++ .../kafka/connect/sink/SinkTaskContext.java | 82 ++ .../kafka/connect/source/SourceConnector.java | 29 + .../kafka/connect/source/SourceRecord.java | 109 +++ .../apache/kafka/connect/source/SourceTask.java | 82 ++ .../kafka/connect/source/SourceTaskContext.java | 32 + .../apache/kafka/connect/storage/Converter.java | 57 ++ .../connect/storage/OffsetStorageReader.java | 65 ++ .../kafka/connect/storage/StringConverter.java | 81 ++ .../kafka/connect/util/ConnectorUtils.java | 66 ++ .../connector/ConnectorReconfigurationTest.java | 82 ++ .../kafka/connect/data/ConnectSchemaTest.java | 303 ++++++ .../org/apache/kafka/connect/data/DateTest.java | 78 ++ .../apache/kafka/connect/data/DecimalTest.java | 63 ++ .../apache/kafka/connect/data/FieldTest.java | 40 + .../kafka/connect/data/SchemaBuilderTest.java | 305 ++++++ .../kafka/connect/data/SchemaProjectorTest.java | 495 ++++++++++ .../apache/kafka/connect/data/StructTest.java | 222 +++++ .../org/apache/kafka/connect/data/TimeTest.java | 80 ++ .../kafka/connect/data/TimestampTest.java | 75 ++ .../connect/storage/StringConverterTest.java | 83 ++ .../kafka/connect/util/ConnectorUtilsTest.java | 67 ++ .../connect/file/FileStreamSinkConnector.java | 69 ++ .../kafka/connect/file/FileStreamSinkTask.java | 94 ++ .../connect/file/FileStreamSourceConnector.java | 77 ++ .../connect/file/FileStreamSourceTask.java | 216 +++++ .../file/FileStreamSinkConnectorTest.java | 86 ++ .../connect/file/FileStreamSinkTaskTest.java | 69 ++ .../file/FileStreamSourceConnectorTest.java | 105 +++ .../connect/file/FileStreamSourceTaskTest.java | 150 +++ .../kafka/connect/json/JsonConverter.java | 735 +++++++++++++++ .../kafka/connect/json/JsonDeserializer.java | 62 ++ .../apache/kafka/connect/json/JsonSchema.java | 82 ++ .../kafka/connect/json/JsonSerializer.java | 60 ++ .../kafka/connect/json/JsonConverterTest.java | 644 +++++++++++++ .../kafka/connect/cli/ConnectDistributed.java | 67 ++ .../kafka/connect/cli/ConnectStandalone.java | 98 ++ .../connect/errors/AlreadyExistsException.java | 35 + .../kafka/connect/errors/NotFoundException.java | 35 + .../connect/errors/RetriableException.java | 35 + .../apache/kafka/connect/runtime/Connect.java | 99 ++ .../kafka/connect/runtime/ConnectorConfig.java | 73 ++ .../apache/kafka/connect/runtime/Herder.java | 148 +++ .../connect/runtime/HerderConnectorContext.java | 42 + .../runtime/SourceTaskOffsetCommitter.java | 139 +++ .../kafka/connect/runtime/TaskConfig.java | 54 ++ .../apache/kafka/connect/runtime/Worker.java | 331 +++++++ .../kafka/connect/runtime/WorkerConfig.java | 138 +++ .../kafka/connect/runtime/WorkerSinkTask.java | 370 ++++++++ .../connect/runtime/WorkerSinkTaskContext.java | 111 +++ .../connect/runtime/WorkerSinkTaskThread.java | 116 +++ .../kafka/connect/runtime/WorkerSourceTask.java | 339 +++++++ .../runtime/WorkerSourceTaskContext.java | 35 + .../kafka/connect/runtime/WorkerTask.java | 54 ++ .../runtime/distributed/ClusterConfigState.java | 145 +++ .../runtime/distributed/ConnectProtocol.java | 269 ++++++ .../runtime/distributed/DistributedConfig.java | 187 ++++ .../runtime/distributed/DistributedHerder.java | 920 +++++++++++++++++++ .../runtime/distributed/NotLeaderException.java | 47 + .../runtime/distributed/WorkerCoordinator.java | 294 ++++++ .../runtime/distributed/WorkerGroupMember.java | 185 ++++ .../distributed/WorkerRebalanceListener.java | 38 + .../kafka/connect/runtime/rest/RestServer.java | 258 ++++++ .../runtime/rest/entities/ConnectorInfo.java | 81 ++ .../rest/entities/CreateConnectorRequest.java | 59 ++ .../runtime/rest/entities/ErrorMessage.java | 63 ++ .../runtime/rest/entities/ServerInfo.java | 41 + .../connect/runtime/rest/entities/TaskInfo.java | 58 ++ .../rest/errors/ConnectExceptionMapper.java | 60 ++ .../rest/errors/ConnectRestException.java | 70 ++ .../rest/resources/ConnectorsResource.java | 201 ++++ .../runtime/rest/resources/RootResource.java | 36 + .../runtime/standalone/StandaloneConfig.java | 35 + .../runtime/standalone/StandaloneHerder.java | 272 ++++++ .../connect/storage/FileOffsetBackingStore.java | 102 ++ .../connect/storage/KafkaConfigStorage.java | 578 ++++++++++++ .../storage/KafkaOffsetBackingStore.java | 213 +++++ .../storage/MemoryOffsetBackingStore.java | 105 +++ .../connect/storage/OffsetBackingStore.java | 72 ++ .../storage/OffsetStorageReaderImpl.java | 110 +++ .../connect/storage/OffsetStorageWriter.java | 207 +++++ .../kafka/connect/storage/OffsetUtils.java | 54 ++ .../org/apache/kafka/connect/util/Callback.java | 31 + .../kafka/connect/util/ConnectorTaskId.java | 85 ++ .../connect/util/ConvertingFutureCallback.java | 85 ++ .../kafka/connect/util/FutureCallback.java | 34 + .../kafka/connect/util/KafkaBasedLog.java | 331 +++++++ .../kafka/connect/util/ShutdownableThread.java | 145 +++ .../connect/runtime/WorkerSinkTaskTest.java | 208 +++++ .../runtime/WorkerSinkTaskThreadedTest.java | 563 ++++++++++++ .../connect/runtime/WorkerSourceTaskTest.java | 308 +++++++ .../kafka/connect/runtime/WorkerTest.java | 397 ++++++++ .../distributed/DistributedHerderTest.java | 573 ++++++++++++ .../distributed/WorkerCoordinatorTest.java | 443 +++++++++ .../rest/resources/ConnectorsResourceTest.java | 364 ++++++++ .../standalone/StandaloneHerderTest.java | 337 +++++++ .../storage/FileOffsetBackingStoreTest.java | 117 +++ .../connect/storage/KafkaConfigStorageTest.java | 522 +++++++++++ .../storage/KafkaOffsetBackingStoreTest.java | 357 +++++++ .../storage/OffsetStorageWriterTest.java | 272 ++++++ .../util/ByteArrayProducerRecordEquals.java | 53 ++ .../kafka/connect/util/KafkaBasedLogTest.java | 437 +++++++++ .../org/apache/kafka/connect/util/MockTime.java | 49 + .../connect/util/ShutdownableThreadTest.java | 72 ++ .../TestBackgroundThreadExceptionHandler.java | 37 + .../apache/kafka/connect/util/TestFuture.java | 161 ++++ .../apache/kafka/connect/util/ThreadedTest.java | 43 + .../runtime/src/test/resources/log4j.properties | 23 + .../kafka/copycat/connector/Connector.java | 124 --- .../copycat/connector/ConnectorContext.java | 33 - .../kafka/copycat/connector/CopycatRecord.java | 122 --- .../apache/kafka/copycat/connector/Task.java | 56 -- .../kafka/copycat/data/CopycatSchema.java | 323 ------- .../org/apache/kafka/copycat/data/Date.java | 76 -- .../org/apache/kafka/copycat/data/Decimal.java | 87 -- .../org/apache/kafka/copycat/data/Field.java | 77 -- .../org/apache/kafka/copycat/data/Schema.java | 163 ---- .../kafka/copycat/data/SchemaAndValue.java | 62 -- .../kafka/copycat/data/SchemaBuilder.java | 412 --------- .../kafka/copycat/data/SchemaProjector.java | 197 ---- .../org/apache/kafka/copycat/data/Struct.java | 265 ------ .../org/apache/kafka/copycat/data/Time.java | 77 -- .../apache/kafka/copycat/data/Timestamp.java | 64 -- .../kafka/copycat/errors/CopycatException.java | 40 - .../kafka/copycat/errors/DataException.java | 35 - .../errors/IllegalWorkerStateException.java | 35 - .../copycat/errors/SchemaBuilderException.java | 32 - .../errors/SchemaProjectorException.java | 29 - .../kafka/copycat/sink/SinkConnector.java | 40 - .../apache/kafka/copycat/sink/SinkRecord.java | 72 -- .../org/apache/kafka/copycat/sink/SinkTask.java | 107 --- .../kafka/copycat/sink/SinkTaskContext.java | 82 -- .../kafka/copycat/source/SourceConnector.java | 29 - .../kafka/copycat/source/SourceRecord.java | 109 --- .../apache/kafka/copycat/source/SourceTask.java | 82 -- .../kafka/copycat/source/SourceTaskContext.java | 32 - .../apache/kafka/copycat/storage/Converter.java | 57 -- .../copycat/storage/OffsetStorageReader.java | 65 -- .../kafka/copycat/storage/StringConverter.java | 81 -- .../kafka/copycat/util/ConnectorUtils.java | 66 -- .../connector/ConnectorReconfigurationTest.java | 82 -- .../kafka/copycat/data/CopycatSchemaTest.java | 303 ------ .../org/apache/kafka/copycat/data/DateTest.java | 78 -- .../apache/kafka/copycat/data/DecimalTest.java | 63 -- .../apache/kafka/copycat/data/FieldTest.java | 40 - .../kafka/copycat/data/SchemaBuilderTest.java | 305 ------ .../kafka/copycat/data/SchemaProjectorTest.java | 495 ---------- .../apache/kafka/copycat/data/StructTest.java | 222 ----- .../org/apache/kafka/copycat/data/TimeTest.java | 80 -- .../kafka/copycat/data/TimestampTest.java | 75 -- .../copycat/storage/StringConverterTest.java | 83 -- .../kafka/copycat/util/ConnectorUtilsTest.java | 67 -- .../copycat/file/FileStreamSinkConnector.java | 69 -- .../kafka/copycat/file/FileStreamSinkTask.java | 94 -- .../copycat/file/FileStreamSourceConnector.java | 77 -- .../copycat/file/FileStreamSourceTask.java | 216 ----- .../file/FileStreamSinkConnectorTest.java | 86 -- .../copycat/file/FileStreamSinkTaskTest.java | 69 -- .../file/FileStreamSourceConnectorTest.java | 105 --- .../copycat/file/FileStreamSourceTaskTest.java | 150 --- .../kafka/copycat/json/JsonConverter.java | 735 --------------- .../kafka/copycat/json/JsonDeserializer.java | 62 -- .../apache/kafka/copycat/json/JsonSchema.java | 82 -- .../kafka/copycat/json/JsonSerializer.java | 60 -- .../kafka/copycat/json/JsonConverterTest.java | 644 ------------- .../kafka/copycat/cli/CopycatDistributed.java | 67 -- .../kafka/copycat/cli/CopycatStandalone.java | 98 -- .../copycat/errors/AlreadyExistsException.java | 35 - .../kafka/copycat/errors/NotFoundException.java | 35 - .../copycat/errors/RetriableException.java | 35 - .../kafka/copycat/runtime/ConnectorConfig.java | 74 -- .../apache/kafka/copycat/runtime/Copycat.java | 99 -- .../apache/kafka/copycat/runtime/Herder.java | 148 --- .../copycat/runtime/HerderConnectorContext.java | 42 - .../runtime/SourceTaskOffsetCommitter.java | 139 --- .../kafka/copycat/runtime/TaskConfig.java | 54 -- .../apache/kafka/copycat/runtime/Worker.java | 331 ------- .../kafka/copycat/runtime/WorkerConfig.java | 138 --- .../kafka/copycat/runtime/WorkerSinkTask.java | 370 -------- .../copycat/runtime/WorkerSinkTaskContext.java | 111 --- .../copycat/runtime/WorkerSinkTaskThread.java | 116 --- .../kafka/copycat/runtime/WorkerSourceTask.java | 339 ------- .../runtime/WorkerSourceTaskContext.java | 35 - .../kafka/copycat/runtime/WorkerTask.java | 54 -- .../runtime/distributed/ClusterConfigState.java | 145 --- .../runtime/distributed/CopycatProtocol.java | 269 ------ .../runtime/distributed/DistributedConfig.java | 187 ---- .../runtime/distributed/DistributedHerder.java | 920 ------------------- .../runtime/distributed/NotLeaderException.java | 47 - .../runtime/distributed/WorkerCoordinator.java | 293 ------ .../runtime/distributed/WorkerGroupMember.java | 185 ---- .../distributed/WorkerRebalanceListener.java | 38 - .../kafka/copycat/runtime/rest/RestServer.java | 258 ------ .../runtime/rest/entities/ConnectorInfo.java | 81 -- .../rest/entities/CreateConnectorRequest.java | 59 -- .../runtime/rest/entities/ErrorMessage.java | 63 -- .../runtime/rest/entities/ServerInfo.java | 41 - .../copycat/runtime/rest/entities/TaskInfo.java | 58 -- .../rest/errors/CopycatExceptionMapper.java | 60 -- .../rest/errors/CopycatRestException.java | 70 -- .../rest/resources/ConnectorsResource.java | 201 ---- .../runtime/rest/resources/RootResource.java | 36 - .../runtime/standalone/StandaloneConfig.java | 35 - .../runtime/standalone/StandaloneHerder.java | 272 ------ .../copycat/storage/FileOffsetBackingStore.java | 102 -- .../copycat/storage/KafkaConfigStorage.java | 578 ------------ .../storage/KafkaOffsetBackingStore.java | 213 ----- .../storage/MemoryOffsetBackingStore.java | 105 --- .../copycat/storage/OffsetBackingStore.java | 72 -- .../storage/OffsetStorageReaderImpl.java | 110 --- .../copycat/storage/OffsetStorageWriter.java | 207 ----- .../kafka/copycat/storage/OffsetUtils.java | 54 -- .../org/apache/kafka/copycat/util/Callback.java | 31 - .../kafka/copycat/util/ConnectorTaskId.java | 85 -- .../copycat/util/ConvertingFutureCallback.java | 85 -- .../kafka/copycat/util/FutureCallback.java | 34 - .../kafka/copycat/util/KafkaBasedLog.java | 331 ------- .../kafka/copycat/util/ShutdownableThread.java | 145 --- .../copycat/runtime/WorkerSinkTaskTest.java | 208 ----- .../runtime/WorkerSinkTaskThreadedTest.java | 563 ------------ .../copycat/runtime/WorkerSourceTaskTest.java | 308 ------- .../kafka/copycat/runtime/WorkerTest.java | 397 -------- .../distributed/DistributedHerderTest.java | 573 ------------ .../distributed/WorkerCoordinatorTest.java | 443 --------- .../rest/resources/ConnectorsResourceTest.java | 364 -------- .../standalone/StandaloneHerderTest.java | 337 ------- .../storage/FileOffsetBackingStoreTest.java | 117 --- .../copycat/storage/KafkaConfigStorageTest.java | 522 ----------- .../storage/KafkaOffsetBackingStoreTest.java | 357 ------- .../storage/OffsetStorageWriterTest.java | 272 ------ .../util/ByteArrayProducerRecordEquals.java | 53 -- .../kafka/copycat/util/KafkaBasedLogTest.java | 437 --------- .../org/apache/kafka/copycat/util/MockTime.java | 49 - .../copycat/util/ShutdownableThreadTest.java | 72 -- .../TestBackgroundThreadExceptionHandler.java | 37 - .../apache/kafka/copycat/util/TestFuture.java | 161 ---- .../apache/kafka/copycat/util/ThreadedTest.java | 43 - .../runtime/src/test/resources/log4j.properties | 23 - .../GroupCoordinatorResponseTest.scala | 2 +- docs/security.html | 2 +- settings.gradle | 2 +- tests/kafkatest/services/connect.py | 191 ++++ tests/kafkatest/services/copycat.py | 191 ---- .../kafkatest/tests/connect_distributed_test.py | 97 ++ tests/kafkatest/tests/connect_rest_test.py | 163 ++++ tests/kafkatest/tests/connect_test.py | 93 ++ .../kafkatest/tests/copycat_distributed_test.py | 97 -- tests/kafkatest/tests/copycat_rest_test.py | 163 ---- tests/kafkatest/tests/copycat_test.py | 93 -- .../templates/connect-distributed.properties | 40 + .../templates/connect-file-sink.properties | 20 + .../templates/connect-file-source.properties | 20 + .../templates/connect-standalone.properties | 32 + .../templates/copycat-distributed.properties | 40 - .../templates/copycat-file-sink.properties | 20 - .../templates/copycat-file-source.properties | 20 - .../templates/copycat-standalone.properties | 32 - 301 files changed, 21527 insertions(+), 21527 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/bin/connect-distributed.sh ---------------------------------------------------------------------- diff --git a/bin/connect-distributed.sh b/bin/connect-distributed.sh new file mode 100755 index 0000000..d9f6325 --- /dev/null +++ b/bin/connect-distributed.sh @@ -0,0 +1,23 @@ +#!/bin/sh +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +base_dir=$(dirname $0) + +if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then + export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/connect-log4j.properties" +fi + +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.connect.cli.ConnectDistributed "$@" http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/bin/connect-standalone.sh ---------------------------------------------------------------------- diff --git a/bin/connect-standalone.sh b/bin/connect-standalone.sh new file mode 100755 index 0000000..a14df86 --- /dev/null +++ b/bin/connect-standalone.sh @@ -0,0 +1,23 @@ +#!/bin/sh +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +base_dir=$(dirname $0) + +if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then + export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/connect-log4j.properties" +fi + +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.connect.cli.ConnectStandalone "$@" http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/bin/copycat-distributed.sh ---------------------------------------------------------------------- diff --git a/bin/copycat-distributed.sh b/bin/copycat-distributed.sh deleted file mode 100755 index 4d62300..0000000 --- a/bin/copycat-distributed.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/sh -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -base_dir=$(dirname $0) - -if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then - export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/copycat-log4j.properties" -fi - -exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.copycat.cli.CopycatDistributed "$@" http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/bin/copycat-standalone.sh ---------------------------------------------------------------------- diff --git a/bin/copycat-standalone.sh b/bin/copycat-standalone.sh deleted file mode 100755 index b219f8a..0000000 --- a/bin/copycat-standalone.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/sh -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -base_dir=$(dirname $0) - -if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then - export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/copycat-log4j.properties" -fi - -exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.copycat.cli.CopycatStandalone "$@" http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/bin/kafka-run-class.sh ---------------------------------------------------------------------- diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index cfddae0..b18a9cf 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -74,12 +74,12 @@ done for cc_pkg in "api" "runtime" "file" "json" do - for file in $base_dir/copycat/${cc_pkg}/build/libs/copycat-${cc_pkg}*.jar; + for file in $base_dir/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar; do CLASSPATH=$CLASSPATH:$file done - if [ -d "$base_dir/copycat/${cc_pkg}/build/dependant-libs" ] ; then - CLASSPATH=$CLASSPATH:$base_dir/copycat/${cc_pkg}/build/dependant-libs/* + if [ -d "$base_dir/connect/${cc_pkg}/build/dependant-libs" ] ; then + CLASSPATH=$CLASSPATH:$base_dir/connect/${cc_pkg}/build/dependant-libs/* fi done http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 764f9e6..49733c4 100644 --- a/build.gradle +++ b/build.gradle @@ -228,17 +228,17 @@ for ( sv in ['2_10_5', '2_11_7'] ) { } } -def copycatPkgs = ['copycat:api', 'copycat:runtime', 'copycat:json', 'copycat:file'] -def pkgs = ['clients', 'examples', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'log4j-appender', 'tools', 'streams'] + copycatPkgs +def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file'] +def pkgs = ['clients', 'examples', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'log4j-appender', 'tools', 'streams'] + connectPkgs -tasks.create(name: "jarCopycat", dependsOn: copycatPkgs.collect { it + ":jar" }) {} +tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" }) {} tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7'] + pkgs.collect { it + ":jar" }) { } tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_10_5', 'srcJar_2_11_7'] + pkgs.collect { it + ":srcJar" }) { } tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_10_5', 'docsJar_2_11_7'] + pkgs.collect { it + ":docsJar" }) { } -tasks.create(name: "testCopycat", dependsOn: copycatPkgs.collect { it + ":test" }) {} +tasks.create(name: "testConnect", dependsOn: connectPkgs.collect { it + ":test" }) {} tasks.create(name: "testAll", dependsOn: ['test_core_2_10_5', 'test_core_2_11_7'] + pkgs.collect { it + ":test" }) { } tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_10_5', 'releaseTarGz_2_11_7']) { @@ -675,9 +675,9 @@ project(':log4j-appender') { test.dependsOn('checkstyleMain', 'checkstyleTest') } -project(':copycat:api') { +project(':connect:api') { apply plugin: 'checkstyle' - archivesBaseName = "copycat-api" + archivesBaseName = "connect-api" dependencies { compile "$slf4japi" @@ -700,7 +700,7 @@ project(':copycat:api') { } javadoc { - include "**/org/apache/kafka/copycat/*" + include "**/org/apache/kafka/connect/*" } tasks.create(name: "copyDependantLibs", type: Copy) { @@ -709,7 +709,7 @@ project(':copycat:api') { } from (configurations.runtime) { exclude('kafka-clients*') - exclude('copycat-*') + exclude('connect-*') } into "$buildDir/dependant-libs" } @@ -732,12 +732,12 @@ project(':copycat:api') { test.dependsOn('checkstyleMain', 'checkstyleTest') } -project(':copycat:json') { +project(':connect:json') { apply plugin: 'checkstyle' - archivesBaseName = "copycat-json" + archivesBaseName = "connect-json" dependencies { - compile project(':copycat:api') + compile project(':connect:api') compile "$slf4japi" compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version" @@ -761,7 +761,7 @@ project(':copycat:json') { } javadoc { - include "**/org/apache/kafka/copycat/*" + include "**/org/apache/kafka/connect/*" } tasks.create(name: "copyDependantLibs", type: Copy) { @@ -770,7 +770,7 @@ project(':copycat:json') { } from (configurations.runtime) { exclude('kafka-clients*') - exclude('copycat-*') + exclude('connect-*') } into "$buildDir/dependant-libs" } @@ -793,12 +793,12 @@ project(':copycat:json') { test.dependsOn('checkstyleMain', 'checkstyleTest') } -project(':copycat:runtime') { +project(':connect:runtime') { apply plugin: 'checkstyle' - archivesBaseName = "copycat-runtime" + archivesBaseName = "connect-runtime" dependencies { - compile project(':copycat:api') + compile project(':connect:api') compile project(':clients') compile "$slf4japi" @@ -813,7 +813,7 @@ project(':copycat:runtime') { testCompile "$powermock_easymock" testCompile project(':clients').sourceSets.test.output testRuntime "$slf4jlog4j" - testRuntime project(":copycat:json") + testRuntime project(":connect:json") } task testJar(type: Jar) { @@ -829,7 +829,7 @@ project(':copycat:runtime') { } javadoc { - include "**/org/apache/kafka/copycat/*" + include "**/org/apache/kafka/connect/*" } tasks.create(name: "copyDependantLibs", type: Copy) { @@ -838,7 +838,7 @@ project(':copycat:runtime') { } from (configurations.runtime) { exclude('kafka-clients*') - exclude('copycat-*') + exclude('connect-*') } into "$buildDir/dependant-libs" } @@ -861,12 +861,12 @@ project(':copycat:runtime') { test.dependsOn('checkstyleMain', 'checkstyleTest') } -project(':copycat:file') { +project(':connect:file') { apply plugin: 'checkstyle' - archivesBaseName = "copycat-file" + archivesBaseName = "connect-file" dependencies { - compile project(':copycat:api') + compile project(':connect:api') compile "$slf4japi" testCompile "$junit" @@ -889,7 +889,7 @@ project(':copycat:file') { } javadoc { - include "**/org/apache/kafka/copycat/*" + include "**/org/apache/kafka/connect/*" } tasks.create(name: "copyDependantLibs", type: Copy) { @@ -898,7 +898,7 @@ project(':copycat:file') { } from (configurations.runtime) { exclude('kafka-clients*') - exclude('copycat-*') + exclude('connect-*') } into "$buildDir/dependant-libs" } http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 75027f5..95ea3b7 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -141,26 +141,26 @@ <allow pkg="org.bouncycastle" /> </subpackage> - <subpackage name="copycat"> + <subpackage name="connect"> <allow pkg="org.apache.kafka.common" /> - <allow pkg="org.apache.kafka.copycat.data" /> - <allow pkg="org.apache.kafka.copycat.errors" /> + <allow pkg="org.apache.kafka.connect.data" /> + <allow pkg="org.apache.kafka.connect.errors" /> <allow pkg="org.apache.kafka.clients" /> <allow pkg="org.apache.kafka.test"/> <subpackage name="source"> - <allow pkg="org.apache.kafka.copycat.connector" /> - <allow pkg="org.apache.kafka.copycat.storage" /> + <allow pkg="org.apache.kafka.connect.connector" /> + <allow pkg="org.apache.kafka.connect.storage" /> </subpackage> <subpackage name="sink"> <allow pkg="org.apache.kafka.clients.consumer" /> - <allow pkg="org.apache.kafka.copycat.connector" /> - <allow pkg="org.apache.kafka.copycat.storage" /> + <allow pkg="org.apache.kafka.connect.connector" /> + <allow pkg="org.apache.kafka.connect.storage" /> </subpackage> <subpackage name="runtime"> - <allow pkg="org.apache.kafka.copycat" /> + <allow pkg="org.apache.kafka.connect" /> <subpackage name="rest"> <allow pkg="org.eclipse.jetty" /> @@ -172,19 +172,19 @@ </subpackage> <subpackage name="cli"> - <allow pkg="org.apache.kafka.copycat.runtime" /> - <allow pkg="org.apache.kafka.copycat.storage" /> - <allow pkg="org.apache.kafka.copycat.util" /> + <allow pkg="org.apache.kafka.connect.runtime" /> + <allow pkg="org.apache.kafka.connect.storage" /> + <allow pkg="org.apache.kafka.connect.util" /> <allow pkg="org.apache.kafka.common" /> </subpackage> <subpackage name="storage"> - <allow pkg="org.apache.kafka.copycat" /> + <allow pkg="org.apache.kafka.connect" /> <allow pkg="org.apache.kafka.common.serialization" /> </subpackage> <subpackage name="util"> - <allow pkg="org.apache.kafka.copycat" /> + <allow pkg="org.apache.kafka.connect" /> <!-- for annotations to avoid code duplication --> <allow pkg="com.fasterxml.jackson.annotation" /> </subpackage> @@ -193,11 +193,11 @@ <allow pkg="com.fasterxml.jackson" /> <allow pkg="org.apache.kafka.common.serialization" /> <allow pkg="org.apache.kafka.common.errors" /> - <allow pkg="org.apache.kafka.copycat.storage" /> + <allow pkg="org.apache.kafka.connect.storage" /> </subpackage> <subpackage name="file"> - <allow pkg="org.apache.kafka.copycat" /> + <allow pkg="org.apache.kafka.connect" /> <allow pkg="org.apache.kafka.clients.consumer" /> <!-- for tests --> <allow pkg="org.easymock" /> http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 4d964c9..5b944d0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -126,7 +126,7 @@ public abstract class AbstractCoordinator implements Closeable { } /** - * Unique identifier for the class of protocols implements (e.g. "consumer" or "copycat"). + * Unique identifier for the class of protocols implements (e.g. "consumer" or "connect"). * @return Non-null protocol type name */ protected abstract String protocolType(); http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/connect-console-sink.properties ---------------------------------------------------------------------- diff --git a/config/connect-console-sink.properties b/config/connect-console-sink.properties new file mode 100644 index 0000000..30dbe2f --- /dev/null +++ b/config/connect-console-sink.properties @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name=local-console-sink +connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector +tasks.max=1 +topics=test \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/connect-console-source.properties ---------------------------------------------------------------------- diff --git a/config/connect-console-source.properties b/config/connect-console-source.properties new file mode 100644 index 0000000..d161e4e --- /dev/null +++ b/config/connect-console-source.properties @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name=local-console-source +connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector +tasks.max=1 +topic=test \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/connect-distributed.properties ---------------------------------------------------------------------- diff --git a/config/connect-distributed.properties b/config/connect-distributed.properties new file mode 100644 index 0000000..9ec63db --- /dev/null +++ b/config/connect-distributed.properties @@ -0,0 +1,42 @@ +## +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +# These are defaults. This file just demonstrates how to override some settings. +bootstrap.servers=localhost:9092 + +group.id=connect-cluster + +# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will +# need to configure these based on the format they want their data in when loaded from or stored into Kafka +key.converter=org.apache.kafka.connect.json.JsonConverter +value.converter=org.apache.kafka.connect.json.JsonConverter +# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply +# it to +key.converter.schemas.enable=true +value.converter.schemas.enable=true + +# The internal converter used for offsets and config data is configurable and must be specified, but most users will +# always want to use the built-in default. Offset and config data is never visible outside of Copcyat in this format. +internal.key.converter=org.apache.kafka.connect.json.JsonConverter +internal.value.converter=org.apache.kafka.connect.json.JsonConverter +internal.key.converter.schemas.enable=false +internal.value.converter.schemas.enable=false + +offset.storage.topic=connect-offsets +# Flush much faster than normal, which is useful for testing/debugging +offset.flush.interval.ms=10000 +config.storage.topic=connect-configs \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/connect-file-sink.properties ---------------------------------------------------------------------- diff --git a/config/connect-file-sink.properties b/config/connect-file-sink.properties new file mode 100644 index 0000000..17275b3 --- /dev/null +++ b/config/connect-file-sink.properties @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name=local-file-sink +connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector +tasks.max=1 +file=test.sink.txt +topics=test \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/connect-file-source.properties ---------------------------------------------------------------------- diff --git a/config/connect-file-source.properties b/config/connect-file-source.properties new file mode 100644 index 0000000..31fa96c --- /dev/null +++ b/config/connect-file-source.properties @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name=local-file-source +connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector +tasks.max=1 +file=test.txt +topic=test \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/connect-log4j.properties ---------------------------------------------------------------------- diff --git a/config/connect-log4j.properties b/config/connect-log4j.properties new file mode 100644 index 0000000..158daed --- /dev/null +++ b/config/connect-log4j.properties @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n + +log4j.logger.org.apache.zookeeper=ERROR +log4j.logger.org.I0Itec.zkclient=ERROR \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/connect-standalone.properties ---------------------------------------------------------------------- diff --git a/config/connect-standalone.properties b/config/connect-standalone.properties new file mode 100644 index 0000000..8c4f98e --- /dev/null +++ b/config/connect-standalone.properties @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# These are defaults. This file just demonstrates how to override some settings. +bootstrap.servers=localhost:9092 + +# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will +# need to configure these based on the format they want their data in when loaded from or stored into Kafka +key.converter=org.apache.kafka.connect.json.JsonConverter +value.converter=org.apache.kafka.connect.json.JsonConverter +# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply +# it to +key.converter.schemas.enable=true +value.converter.schemas.enable=true + +# The internal converter used for offsets and config data is configurable and must be specified, but most users will +# always want to use the built-in default. Offset and config data is never visible outside of Copcyat in this format. +internal.key.converter=org.apache.kafka.connect.json.JsonConverter +internal.value.converter=org.apache.kafka.connect.json.JsonConverter +internal.key.converter.schemas.enable=false +internal.value.converter.schemas.enable=false + +offset.storage.file.filename=/tmp/connect.offsets +# Flush much faster than normal, which is useful for testing/debugging +offset.flush.interval.ms=10000 http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/copycat-console-sink.properties ---------------------------------------------------------------------- diff --git a/config/copycat-console-sink.properties b/config/copycat-console-sink.properties deleted file mode 100644 index 4cd4c33..0000000 --- a/config/copycat-console-sink.properties +++ /dev/null @@ -1,19 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -name=local-console-sink -connector.class=org.apache.kafka.copycat.file.FileStreamSinkConnector -tasks.max=1 -topics=test \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/copycat-console-source.properties ---------------------------------------------------------------------- diff --git a/config/copycat-console-source.properties b/config/copycat-console-source.properties deleted file mode 100644 index 17dbbf9..0000000 --- a/config/copycat-console-source.properties +++ /dev/null @@ -1,19 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -name=local-console-source -connector.class=org.apache.kafka.copycat.file.FileStreamSourceConnector -tasks.max=1 -topic=test \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/copycat-distributed.properties ---------------------------------------------------------------------- diff --git a/config/copycat-distributed.properties b/config/copycat-distributed.properties deleted file mode 100644 index 2ea5b73..0000000 --- a/config/copycat-distributed.properties +++ /dev/null @@ -1,42 +0,0 @@ -## -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -## - -# These are defaults. This file just demonstrates how to override some settings. -bootstrap.servers=localhost:9092 - -group.id=copycat-cluster - -# The converters specify the format of data in Kafka and how to translate it into Copycat data. Every Copycat user will -# need to configure these based on the format they want their data in when loaded from or stored into Kafka -key.converter=org.apache.kafka.copycat.json.JsonConverter -value.converter=org.apache.kafka.copycat.json.JsonConverter -# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply -# it to -key.converter.schemas.enable=true -value.converter.schemas.enable=true - -# The internal converter used for offsets and config data is configurable and must be specified, but most users will -# always want to use the built-in default. Offset and config data is never visible outside of Copcyat in this format. -internal.key.converter=org.apache.kafka.copycat.json.JsonConverter -internal.value.converter=org.apache.kafka.copycat.json.JsonConverter -internal.key.converter.schemas.enable=false -internal.value.converter.schemas.enable=false - -offset.storage.topic=copycat-offsets -# Flush much faster than normal, which is useful for testing/debugging -offset.flush.interval.ms=10000 -config.storage.topic=copycat-configs \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/copycat-file-sink.properties ---------------------------------------------------------------------- diff --git a/config/copycat-file-sink.properties b/config/copycat-file-sink.properties deleted file mode 100644 index 3cc0d62..0000000 --- a/config/copycat-file-sink.properties +++ /dev/null @@ -1,20 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -name=local-file-sink -connector.class=org.apache.kafka.copycat.file.FileStreamSinkConnector -tasks.max=1 -file=test.sink.txt -topics=test \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/copycat-file-source.properties ---------------------------------------------------------------------- diff --git a/config/copycat-file-source.properties b/config/copycat-file-source.properties deleted file mode 100644 index 7512e50..0000000 --- a/config/copycat-file-source.properties +++ /dev/null @@ -1,20 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -name=local-file-source -connector.class=org.apache.kafka.copycat.file.FileStreamSourceConnector -tasks.max=1 -file=test.txt -topic=test \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/copycat-log4j.properties ---------------------------------------------------------------------- diff --git a/config/copycat-log4j.properties b/config/copycat-log4j.properties deleted file mode 100644 index 158daed..0000000 --- a/config/copycat-log4j.properties +++ /dev/null @@ -1,23 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -log4j.rootLogger=INFO, stdout - -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n - -log4j.logger.org.apache.zookeeper=ERROR -log4j.logger.org.I0Itec.zkclient=ERROR \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/copycat-standalone.properties ---------------------------------------------------------------------- diff --git a/config/copycat-standalone.properties b/config/copycat-standalone.properties deleted file mode 100644 index ebf689f..0000000 --- a/config/copycat-standalone.properties +++ /dev/null @@ -1,37 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# These are defaults. This file just demonstrates how to override some settings. -bootstrap.servers=localhost:9092 - -# The converters specify the format of data in Kafka and how to translate it into Copycat data. Every Copycat user will -# need to configure these based on the format they want their data in when loaded from or stored into Kafka -key.converter=org.apache.kafka.copycat.json.JsonConverter -value.converter=org.apache.kafka.copycat.json.JsonConverter -# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply -# it to -key.converter.schemas.enable=true -value.converter.schemas.enable=true - -# The internal converter used for offsets and config data is configurable and must be specified, but most users will -# always want to use the built-in default. Offset and config data is never visible outside of Copcyat in this format. -internal.key.converter=org.apache.kafka.copycat.json.JsonConverter -internal.value.converter=org.apache.kafka.copycat.json.JsonConverter -internal.key.converter.schemas.enable=false -internal.value.converter.schemas.enable=false - -offset.storage.file.filename=/tmp/copycat.offsets -# Flush much faster than normal, which is useful for testing/debugging -offset.flush.interval.ms=10000 http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java new file mode 100644 index 0000000..21f0944 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.connector; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.connect.data.Schema; + +/** + * <p> + * Base class for records containing data to be copied to/from Kafka. This corresponds closely to + * Kafka's ProducerRecord and ConsumerRecord classes, and holds the data that may be used by both + * sources and sinks (topic, kafkaPartition, key, value). Although both implementations include a + * notion of offset, it is not included here because they differ in type. + * </p> + */ +@InterfaceStability.Unstable +public abstract class ConnectRecord { + private final String topic; + private final Integer kafkaPartition; + private final Schema keySchema; + private final Object key; + private final Schema valueSchema; + private final Object value; + + public ConnectRecord(String topic, Integer kafkaPartition, Schema valueSchema, Object value) { + this(topic, kafkaPartition, null, null, valueSchema, value); + } + + public ConnectRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value) { + this.topic = topic; + this.kafkaPartition = kafkaPartition; + this.keySchema = keySchema; + this.key = key; + this.valueSchema = valueSchema; + this.value = value; + } + + public String topic() { + return topic; + } + + public Integer kafkaPartition() { + return kafkaPartition; + } + + public Object key() { + return key; + } + + public Schema keySchema() { + return keySchema; + } + + public Object value() { + return value; + } + + public Schema valueSchema() { + return valueSchema; + } + + @Override + public String toString() { + return "ConnectRecord{" + + "topic='" + topic + '\'' + + ", kafkaPartition=" + kafkaPartition + + ", key=" + key + + ", value=" + value + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + ConnectRecord that = (ConnectRecord) o; + + if (kafkaPartition != null ? !kafkaPartition.equals(that.kafkaPartition) : that.kafkaPartition != null) + return false; + if (topic != null ? !topic.equals(that.topic) : that.topic != null) + return false; + if (keySchema != null ? !keySchema.equals(that.keySchema) : that.keySchema != null) + return false; + if (key != null ? !key.equals(that.key) : that.key != null) + return false; + if (valueSchema != null ? !valueSchema.equals(that.valueSchema) : that.valueSchema != null) + return false; + if (value != null ? !value.equals(that.value) : that.value != null) + return false; + + return true; + } + + @Override + public int hashCode() { + int result = topic != null ? topic.hashCode() : 0; + result = 31 * result + (kafkaPartition != null ? kafkaPartition.hashCode() : 0); + result = 31 * result + (keySchema != null ? keySchema.hashCode() : 0); + result = 31 * result + (key != null ? key.hashCode() : 0); + result = 31 * result + (valueSchema != null ? valueSchema.hashCode() : 0); + result = 31 * result + (value != null ? value.hashCode() : 0); + return result; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java new file mode 100644 index 0000000..934cdbd --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.connector; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.List; +import java.util.Map; + +/** + * <p> + * Connectors manage integration of Kafka Connect with another system, either as an input that ingests + * data into Kafka or an output that passes data to an external system. Implementations should + * not use this class directly; they should inherit from SourceConnector or SinkConnector. + * </p> + * <p> + * Connectors have two primary tasks. First, given some configuration, they are responsible for + * creating configurations for a set of {@link Task}s that split up the data processing. For + * example, a database Connector might create Tasks by dividing the set of tables evenly among + * tasks. Second, they are responsible for monitoring inputs for changes that require + * reconfiguration and notifying the Kafka Connect runtime via the ConnectorContext. Continuing the + * previous example, the connector might periodically check for new tables and notify Kafka Connect of + * additions and deletions. Kafka Connect will then request new configurations and update the running + * Tasks. + * </p> + */ +@InterfaceStability.Unstable +public abstract class Connector { + + protected ConnectorContext context; + + /** + * Get the version of this connector. + * + * @return the version, formatted as a String + */ + public abstract String version(); + + /** + * Initialize this connector, using the provided ConnectorContext to notify the runtime of + * input configuration changes. + * @param ctx context object used to interact with the Kafka Connect runtime + */ + public void initialize(ConnectorContext ctx) { + context = ctx; + } + + /** + * <p> + * Initialize this connector, using the provided ConnectorContext to notify the runtime of + * input configuration changes and using the provided set of Task configurations. + * This version is only used to recover from failures. + * </p> + * <p> + * The default implementation ignores the provided Task configurations. During recovery, Kafka Connect will request + * an updated set of configurations and update the running Tasks appropriately. However, Connectors should + * implement special handling of this case if it will avoid unnecessary changes to running Tasks. + * </p> + * + * @param ctx context object used to interact with the Kafka Connect runtime + * @param taskConfigs existing task configurations, which may be used when generating new task configs to avoid + * churn in partition to task assignments + */ + public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfigs) { + context = ctx; + // Ignore taskConfigs. May result in more churn of tasks during recovery if updated configs + // are very different, but reduces the difficulty of implementing a Connector + } + + /** + * Start this Connector. This method will only be called on a clean Connector, i.e. it has + * either just been instantiated and initialized or {@link #stop()} has been invoked. + * + * @param props configuration settings + */ + public abstract void start(Map<String, String> props); + + /** + * Reconfigure this Connector. Most implementations will not override this, using the default + * implementation that calls {@link #stop()} followed by {@link #start(Map)}. + * Implementations only need to override this if they want to handle this process more + * efficiently, e.g. without shutting down network connections to the external system. + * + * @param props new configuration settings + */ + public void reconfigure(Map<String, String> props) { + stop(); + start(props); + } + + /** + * Returns the Task implementation for this Connector. + */ + public abstract Class<? extends Task> taskClass(); + + /** + * Returns a set of configurations for Tasks based on the current configuration, + * producing at most count configurations. + * + * @param maxTasks maximum number of configurations to generate + * @return configurations for Tasks + */ + public abstract List<Map<String, String>> taskConfigs(int maxTasks); + + /** + * Stop this connector. + */ + public abstract void stop(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java new file mode 100644 index 0000000..2a06484 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.connector; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * ConnectorContext allows Connectors to proactively interact with the Kafka Connect runtime. + */ +@InterfaceStability.Unstable +public interface ConnectorContext { + /** + * Requests that the runtime reconfigure the Tasks for this source. This should be used to + * indicate to the runtime that something about the input/output has changed (e.g. partitions + * added/removed) and the running Tasks will need to be modified. + */ + void requestTaskReconfiguration(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/connector/Task.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/Task.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/Task.java new file mode 100644 index 0000000..850954d --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/Task.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.connector; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; + +/** + * <p> + * Tasks contain the code that actually copies data to/from another system. They receive + * a configuration from their parent Connector, assigning them a fraction of a Kafka Connect job's work. + * The Kafka Connect framework then pushes/pulls data from the Task. The Task must also be able to + * respond to reconfiguration requests. + * </p> + * <p> + * Task only contains the minimal shared functionality between + * {@link org.apache.kafka.connect.source.SourceTask} and + * {@link org.apache.kafka.connect.sink.SinkTask}. + * </p> + */ +@InterfaceStability.Unstable +public interface Task { + /** + * Get the version of this task. Usually this should be the same as the corresponding {@link Connector} class's version. + * + * @return the version, formatted as a String + */ + String version(); + + /** + * Start the Task + * @param props initial configuration + */ + void start(Map<String, String> props); + + /** + * Stop this task. + */ + void stop(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java new file mode 100644 index 0000000..cb31ca0 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.data; + +import org.apache.kafka.connect.errors.DataException; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.*; + +public class ConnectSchema implements Schema { + /** + * Maps Schema.Types to a list of Java classes that can be used to represent them. + */ + private static final Map<Type, List<Class>> SCHEMA_TYPE_CLASSES = new HashMap<>(); + /** + * Maps known logical types to a list of Java classes that can be used to represent them. + */ + private static final Map<String, List<Class>> LOGICAL_TYPE_CLASSES = new HashMap<>(); + + /** + * Maps the Java classes to the corresponding Schema.Type. + */ + private static final Map<Class<?>, Type> JAVA_CLASS_SCHEMA_TYPES = new HashMap<>(); + + static { + SCHEMA_TYPE_CLASSES.put(Type.INT8, Arrays.asList((Class) Byte.class)); + SCHEMA_TYPE_CLASSES.put(Type.INT16, Arrays.asList((Class) Short.class)); + SCHEMA_TYPE_CLASSES.put(Type.INT32, Arrays.asList((Class) Integer.class)); + SCHEMA_TYPE_CLASSES.put(Type.INT64, Arrays.asList((Class) Long.class)); + SCHEMA_TYPE_CLASSES.put(Type.FLOAT32, Arrays.asList((Class) Float.class)); + SCHEMA_TYPE_CLASSES.put(Type.FLOAT64, Arrays.asList((Class) Double.class)); + SCHEMA_TYPE_CLASSES.put(Type.BOOLEAN, Arrays.asList((Class) Boolean.class)); + SCHEMA_TYPE_CLASSES.put(Type.STRING, Arrays.asList((Class) String.class)); + // Bytes are special and have 2 representations. byte[] causes problems because it doesn't handle equals() and + // hashCode() like we want objects to, so we support both byte[] and ByteBuffer. Using plain byte[] can cause + // those methods to fail, so ByteBuffers are recommended + SCHEMA_TYPE_CLASSES.put(Type.BYTES, Arrays.asList((Class) byte[].class, (Class) ByteBuffer.class)); + SCHEMA_TYPE_CLASSES.put(Type.ARRAY, Arrays.asList((Class) List.class)); + SCHEMA_TYPE_CLASSES.put(Type.MAP, Arrays.asList((Class) Map.class)); + SCHEMA_TYPE_CLASSES.put(Type.STRUCT, Arrays.asList((Class) Struct.class)); + + for (Map.Entry<Type, List<Class>> schemaClasses : SCHEMA_TYPE_CLASSES.entrySet()) { + for (Class<?> schemaClass : schemaClasses.getValue()) + JAVA_CLASS_SCHEMA_TYPES.put(schemaClass, schemaClasses.getKey()); + } + + LOGICAL_TYPE_CLASSES.put(Decimal.LOGICAL_NAME, Arrays.asList((Class) BigDecimal.class)); + LOGICAL_TYPE_CLASSES.put(Date.LOGICAL_NAME, Arrays.asList((Class) java.util.Date.class)); + LOGICAL_TYPE_CLASSES.put(Time.LOGICAL_NAME, Arrays.asList((Class) java.util.Date.class)); + LOGICAL_TYPE_CLASSES.put(Timestamp.LOGICAL_NAME, Arrays.asList((Class) java.util.Date.class)); + // We don't need to put these into JAVA_CLASS_SCHEMA_TYPES since that's only used to determine schemas for + // schemaless data and logical types will have ambiguous schemas (e.g. many of them use the same Java class) so + // they should not be used without schemas. + } + + // The type of the field + private final Type type; + private final boolean optional; + private final Object defaultValue; + + private final List<Field> fields; + private final Map<String, Field> fieldsByName; + + private final Schema keySchema; + private final Schema valueSchema; + + // Optional name and version provide a built-in way to indicate what type of data is included. Most + // useful for structs to indicate the semantics of the struct and map it to some existing underlying + // serializer-specific schema. However, can also be useful in specifying other logical types (e.g. a set is an array + // with additional constraints). + private final String name; + private final Integer version; + // Optional human readable documentation describing this schema. + private final String doc; + private final Map<String, String> parameters; + + /** + * Construct a Schema. Most users should not construct schemas manually, preferring {@link SchemaBuilder} instead. + */ + public ConnectSchema(Type type, boolean optional, Object defaultValue, String name, Integer version, String doc, Map<String, String> parameters, List<Field> fields, Schema keySchema, Schema valueSchema) { + this.type = type; + this.optional = optional; + this.defaultValue = defaultValue; + this.name = name; + this.version = version; + this.doc = doc; + this.parameters = parameters; + + this.fields = fields; + if (this.fields != null && this.type == Type.STRUCT) { + this.fieldsByName = new HashMap<>(); + for (Field field : fields) + fieldsByName.put(field.name(), field); + } else { + this.fieldsByName = null; + } + + this.keySchema = keySchema; + this.valueSchema = valueSchema; + } + + /** + * Construct a Schema for a primitive type, setting schema parameters, struct fields, and key and value schemas to null. + */ + public ConnectSchema(Type type, boolean optional, Object defaultValue, String name, Integer version, String doc) { + this(type, optional, defaultValue, name, version, doc, null, null, null, null); + } + + /** + * Construct a default schema for a primitive type. The schema is required, has no default value, name, version, + * or documentation. + */ + public ConnectSchema(Type type) { + this(type, false, null, null, null, null); + } + + @Override + public Type type() { + return type; + } + + @Override + public boolean isOptional() { + return optional; + } + + @Override + public Object defaultValue() { + return defaultValue; + } + + @Override + public String name() { + return name; + } + + @Override + public Integer version() { + return version; + } + + @Override + public String doc() { + return doc; + } + + @Override + public Map<String, String> parameters() { + return parameters; + } + + @Override + public List<Field> fields() { + if (type != Type.STRUCT) + throw new DataException("Cannot list fields on non-struct type"); + return fields; + } + + public Field field(String fieldName) { + if (type != Type.STRUCT) + throw new DataException("Cannot look up fields on non-struct type"); + return fieldsByName.get(fieldName); + } + + @Override + public Schema keySchema() { + if (type != Type.MAP) + throw new DataException("Cannot look up key schema on non-map type"); + return keySchema; + } + + @Override + public Schema valueSchema() { + if (type != Type.MAP && type != Type.ARRAY) + throw new DataException("Cannot look up value schema on non-array and non-map type"); + return valueSchema; + } + + + + /** + * Validate that the value can be used with the schema, i.e. that its type matches the schema type and nullability + * requirements. Throws a DataException if the value is invalid. + * @param schema Schema to test + * @param value value to test + */ + public static void validateValue(Schema schema, Object value) { + if (value == null) { + if (!schema.isOptional()) + throw new DataException("Invalid value: null used for required field"); + else + return; + } + + List<Class> expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name()); + + if (expectedClasses == null) + expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type()); + + if (expectedClasses == null) + throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass()); + + boolean foundMatch = false; + for (Class<?> expectedClass : expectedClasses) { + if (expectedClass.isInstance(value)) { + foundMatch = true; + break; + } + } + if (!foundMatch) + throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass()); + + switch (schema.type()) { + case STRUCT: + Struct struct = (Struct) value; + if (!struct.schema().equals(schema)) + throw new DataException("Struct schemas do not match."); + struct.validate(); + break; + case ARRAY: + List<?> array = (List<?>) value; + for (Object entry : array) + validateValue(schema.valueSchema(), entry); + break; + case MAP: + Map<?, ?> map = (Map<?, ?>) value; + for (Map.Entry<?, ?> entry : map.entrySet()) { + validateValue(schema.keySchema(), entry.getKey()); + validateValue(schema.valueSchema(), entry.getValue()); + } + break; + } + } + + /** + * Validate that the value can be used for this schema, i.e. that its type matches the schema type and optional + * requirements. Throws a DataException if the value is invalid. + * @param value the value to validate + */ + public void validateValue(Object value) { + validateValue(this, value); + } + + @Override + public ConnectSchema schema() { + return this; + } + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ConnectSchema schema = (ConnectSchema) o; + return Objects.equals(optional, schema.optional) && + Objects.equals(type, schema.type) && + Objects.equals(defaultValue, schema.defaultValue) && + Objects.equals(fields, schema.fields) && + Objects.equals(keySchema, schema.keySchema) && + Objects.equals(valueSchema, schema.valueSchema) && + Objects.equals(name, schema.name) && + Objects.equals(version, schema.version) && + Objects.equals(doc, schema.doc) && + Objects.equals(parameters, schema.parameters); + } + + @Override + public int hashCode() { + return Objects.hash(type, optional, defaultValue, fields, keySchema, valueSchema, name, version, doc, parameters); + } + + @Override + public String toString() { + if (name != null) + return "Schema{" + name + ":" + type + "}"; + else + return "Schema{" + type + "}"; + } + + + /** + * Get the {@link Type} associated with the the given class. + * + * @param klass the Class to + * @return the corresponding type, nor null if there is no matching type + */ + public static Type schemaType(Class<?> klass) { + synchronized (JAVA_CLASS_SCHEMA_TYPES) { + Type schemaType = JAVA_CLASS_SCHEMA_TYPES.get(klass); + if (schemaType != null) + return schemaType; + + // Since the lookup only checks the class, we need to also try + for (Map.Entry<Class<?>, Type> entry : JAVA_CLASS_SCHEMA_TYPES.entrySet()) { + try { + klass.asSubclass(entry.getKey()); + // Cache this for subsequent lookups + JAVA_CLASS_SCHEMA_TYPES.put(klass, entry.getValue()); + return entry.getValue(); + } catch (ClassCastException e) { + // Expected, ignore + } + } + } + return null; + } +}