KAFKA-2774: Rename Copycat to Kafka Connect

Author: Ewen Cheslack-Postava <m...@ewencp.org>

Reviewers: Gwen Shapira

Closes #456 from ewencp/kafka-2774-rename-copycat

(cherry picked from commit f2031d40639ef34c1591c22971394ef41c87652c)
Signed-off-by: Gwen Shapira <csh...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/417e283d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/417e283d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/417e283d

Branch: refs/heads/0.9.0
Commit: 417e283d643d8865aa3e79dffa373c8cc853d78f
Parents: 4801322
Author: Ewen Cheslack-Postava <m...@ewencp.org>
Authored: Sun Nov 8 22:11:03 2015 -0800
Committer: Gwen Shapira <csh...@gmail.com>
Committed: Sun Nov 8 22:11:26 2015 -0800

----------------------------------------------------------------------
 bin/connect-distributed.sh                      |  23 +
 bin/connect-standalone.sh                       |  23 +
 bin/copycat-distributed.sh                      |  23 -
 bin/copycat-standalone.sh                       |  23 -
 bin/kafka-run-class.sh                          |   6 +-
 build.gradle                                    |  48 +-
 checkstyle/import-control.xml                   |  30 +-
 .../consumer/internals/AbstractCoordinator.java |   2 +-
 config/connect-console-sink.properties          |  19 +
 config/connect-console-source.properties        |  19 +
 config/connect-distributed.properties           |  42 +
 config/connect-file-sink.properties             |  20 +
 config/connect-file-source.properties           |  20 +
 config/connect-log4j.properties                 |  23 +
 config/connect-standalone.properties            |  37 +
 config/copycat-console-sink.properties          |  19 -
 config/copycat-console-source.properties        |  19 -
 config/copycat-distributed.properties           |  42 -
 config/copycat-file-sink.properties             |  20 -
 config/copycat-file-source.properties           |  20 -
 config/copycat-log4j.properties                 |  23 -
 config/copycat-standalone.properties            |  37 -
 .../kafka/connect/connector/ConnectRecord.java  | 122 +++
 .../kafka/connect/connector/Connector.java      | 124 +++
 .../connect/connector/ConnectorContext.java     |  33 +
 .../apache/kafka/connect/connector/Task.java    |  56 ++
 .../kafka/connect/data/ConnectSchema.java       | 323 +++++++
 .../org/apache/kafka/connect/data/Date.java     |  76 ++
 .../org/apache/kafka/connect/data/Decimal.java  |  87 ++
 .../org/apache/kafka/connect/data/Field.java    |  77 ++
 .../org/apache/kafka/connect/data/Schema.java   | 163 ++++
 .../kafka/connect/data/SchemaAndValue.java      |  62 ++
 .../kafka/connect/data/SchemaBuilder.java       | 412 +++++++++
 .../kafka/connect/data/SchemaProjector.java     | 197 ++++
 .../org/apache/kafka/connect/data/Struct.java   | 265 ++++++
 .../org/apache/kafka/connect/data/Time.java     |  77 ++
 .../apache/kafka/connect/data/Timestamp.java    |  64 ++
 .../kafka/connect/errors/ConnectException.java  |  40 +
 .../kafka/connect/errors/DataException.java     |  35 +
 .../errors/IllegalWorkerStateException.java     |  35 +
 .../connect/errors/SchemaBuilderException.java  |  32 +
 .../errors/SchemaProjectorException.java        |  29 +
 .../kafka/connect/sink/SinkConnector.java       |  40 +
 .../apache/kafka/connect/sink/SinkRecord.java   |  72 ++
 .../org/apache/kafka/connect/sink/SinkTask.java | 107 +++
 .../kafka/connect/sink/SinkTaskContext.java     |  82 ++
 .../kafka/connect/source/SourceConnector.java   |  29 +
 .../kafka/connect/source/SourceRecord.java      | 109 +++
 .../apache/kafka/connect/source/SourceTask.java |  82 ++
 .../kafka/connect/source/SourceTaskContext.java |  32 +
 .../apache/kafka/connect/storage/Converter.java |  57 ++
 .../connect/storage/OffsetStorageReader.java    |  65 ++
 .../kafka/connect/storage/StringConverter.java  |  81 ++
 .../kafka/connect/util/ConnectorUtils.java      |  66 ++
 .../connector/ConnectorReconfigurationTest.java |  82 ++
 .../kafka/connect/data/ConnectSchemaTest.java   | 303 ++++++
 .../org/apache/kafka/connect/data/DateTest.java |  78 ++
 .../apache/kafka/connect/data/DecimalTest.java  |  63 ++
 .../apache/kafka/connect/data/FieldTest.java    |  40 +
 .../kafka/connect/data/SchemaBuilderTest.java   | 305 ++++++
 .../kafka/connect/data/SchemaProjectorTest.java | 495 ++++++++++
 .../apache/kafka/connect/data/StructTest.java   | 222 +++++
 .../org/apache/kafka/connect/data/TimeTest.java |  80 ++
 .../kafka/connect/data/TimestampTest.java       |  75 ++
 .../connect/storage/StringConverterTest.java    |  83 ++
 .../kafka/connect/util/ConnectorUtilsTest.java  |  67 ++
 .../connect/file/FileStreamSinkConnector.java   |  69 ++
 .../kafka/connect/file/FileStreamSinkTask.java  |  94 ++
 .../connect/file/FileStreamSourceConnector.java |  77 ++
 .../connect/file/FileStreamSourceTask.java      | 216 +++++
 .../file/FileStreamSinkConnectorTest.java       |  86 ++
 .../connect/file/FileStreamSinkTaskTest.java    |  69 ++
 .../file/FileStreamSourceConnectorTest.java     | 105 +++
 .../connect/file/FileStreamSourceTaskTest.java  | 150 +++
 .../kafka/connect/json/JsonConverter.java       | 735 +++++++++++++++
 .../kafka/connect/json/JsonDeserializer.java    |  62 ++
 .../apache/kafka/connect/json/JsonSchema.java   |  82 ++
 .../kafka/connect/json/JsonSerializer.java      |  60 ++
 .../kafka/connect/json/JsonConverterTest.java   | 644 +++++++++++++
 .../kafka/connect/cli/ConnectDistributed.java   |  67 ++
 .../kafka/connect/cli/ConnectStandalone.java    |  98 ++
 .../connect/errors/AlreadyExistsException.java  |  35 +
 .../kafka/connect/errors/NotFoundException.java |  35 +
 .../connect/errors/RetriableException.java      |  35 +
 .../apache/kafka/connect/runtime/Connect.java   |  99 ++
 .../kafka/connect/runtime/ConnectorConfig.java  |  73 ++
 .../apache/kafka/connect/runtime/Herder.java    | 148 +++
 .../connect/runtime/HerderConnectorContext.java |  42 +
 .../runtime/SourceTaskOffsetCommitter.java      | 139 +++
 .../kafka/connect/runtime/TaskConfig.java       |  54 ++
 .../apache/kafka/connect/runtime/Worker.java    | 331 +++++++
 .../kafka/connect/runtime/WorkerConfig.java     | 138 +++
 .../kafka/connect/runtime/WorkerSinkTask.java   | 370 ++++++++
 .../connect/runtime/WorkerSinkTaskContext.java  | 111 +++
 .../connect/runtime/WorkerSinkTaskThread.java   | 116 +++
 .../kafka/connect/runtime/WorkerSourceTask.java | 339 +++++++
 .../runtime/WorkerSourceTaskContext.java        |  35 +
 .../kafka/connect/runtime/WorkerTask.java       |  54 ++
 .../runtime/distributed/ClusterConfigState.java | 145 +++
 .../runtime/distributed/ConnectProtocol.java    | 269 ++++++
 .../runtime/distributed/DistributedConfig.java  | 187 ++++
 .../runtime/distributed/DistributedHerder.java  | 920 +++++++++++++++++++
 .../runtime/distributed/NotLeaderException.java |  47 +
 .../runtime/distributed/WorkerCoordinator.java  | 294 ++++++
 .../runtime/distributed/WorkerGroupMember.java  | 185 ++++
 .../distributed/WorkerRebalanceListener.java    |  38 +
 .../kafka/connect/runtime/rest/RestServer.java  | 258 ++++++
 .../runtime/rest/entities/ConnectorInfo.java    |  81 ++
 .../rest/entities/CreateConnectorRequest.java   |  59 ++
 .../runtime/rest/entities/ErrorMessage.java     |  63 ++
 .../runtime/rest/entities/ServerInfo.java       |  41 +
 .../connect/runtime/rest/entities/TaskInfo.java |  58 ++
 .../rest/errors/ConnectExceptionMapper.java     |  60 ++
 .../rest/errors/ConnectRestException.java       |  70 ++
 .../rest/resources/ConnectorsResource.java      | 201 ++++
 .../runtime/rest/resources/RootResource.java    |  36 +
 .../runtime/standalone/StandaloneConfig.java    |  35 +
 .../runtime/standalone/StandaloneHerder.java    | 272 ++++++
 .../connect/storage/FileOffsetBackingStore.java | 102 ++
 .../connect/storage/KafkaConfigStorage.java     | 578 ++++++++++++
 .../storage/KafkaOffsetBackingStore.java        | 213 +++++
 .../storage/MemoryOffsetBackingStore.java       | 105 +++
 .../connect/storage/OffsetBackingStore.java     |  72 ++
 .../storage/OffsetStorageReaderImpl.java        | 110 +++
 .../connect/storage/OffsetStorageWriter.java    | 207 +++++
 .../kafka/connect/storage/OffsetUtils.java      |  54 ++
 .../org/apache/kafka/connect/util/Callback.java |  31 +
 .../kafka/connect/util/ConnectorTaskId.java     |  85 ++
 .../connect/util/ConvertingFutureCallback.java  |  85 ++
 .../kafka/connect/util/FutureCallback.java      |  34 +
 .../kafka/connect/util/KafkaBasedLog.java       | 331 +++++++
 .../kafka/connect/util/ShutdownableThread.java  | 145 +++
 .../connect/runtime/WorkerSinkTaskTest.java     | 208 +++++
 .../runtime/WorkerSinkTaskThreadedTest.java     | 563 ++++++++++++
 .../connect/runtime/WorkerSourceTaskTest.java   | 308 +++++++
 .../kafka/connect/runtime/WorkerTest.java       | 397 ++++++++
 .../distributed/DistributedHerderTest.java      | 573 ++++++++++++
 .../distributed/WorkerCoordinatorTest.java      | 443 +++++++++
 .../rest/resources/ConnectorsResourceTest.java  | 364 ++++++++
 .../standalone/StandaloneHerderTest.java        | 337 +++++++
 .../storage/FileOffsetBackingStoreTest.java     | 117 +++
 .../connect/storage/KafkaConfigStorageTest.java | 522 +++++++++++
 .../storage/KafkaOffsetBackingStoreTest.java    | 357 +++++++
 .../storage/OffsetStorageWriterTest.java        | 272 ++++++
 .../util/ByteArrayProducerRecordEquals.java     |  53 ++
 .../kafka/connect/util/KafkaBasedLogTest.java   | 437 +++++++++
 .../org/apache/kafka/connect/util/MockTime.java |  49 +
 .../connect/util/ShutdownableThreadTest.java    |  72 ++
 .../TestBackgroundThreadExceptionHandler.java   |  37 +
 .../apache/kafka/connect/util/TestFuture.java   | 161 ++++
 .../apache/kafka/connect/util/ThreadedTest.java |  43 +
 .../runtime/src/test/resources/log4j.properties |  23 +
 .../kafka/copycat/connector/Connector.java      | 124 ---
 .../copycat/connector/ConnectorContext.java     |  33 -
 .../kafka/copycat/connector/CopycatRecord.java  | 122 ---
 .../apache/kafka/copycat/connector/Task.java    |  56 --
 .../kafka/copycat/data/CopycatSchema.java       | 323 -------
 .../org/apache/kafka/copycat/data/Date.java     |  76 --
 .../org/apache/kafka/copycat/data/Decimal.java  |  87 --
 .../org/apache/kafka/copycat/data/Field.java    |  77 --
 .../org/apache/kafka/copycat/data/Schema.java   | 163 ----
 .../kafka/copycat/data/SchemaAndValue.java      |  62 --
 .../kafka/copycat/data/SchemaBuilder.java       | 412 ---------
 .../kafka/copycat/data/SchemaProjector.java     | 197 ----
 .../org/apache/kafka/copycat/data/Struct.java   | 265 ------
 .../org/apache/kafka/copycat/data/Time.java     |  77 --
 .../apache/kafka/copycat/data/Timestamp.java    |  64 --
 .../kafka/copycat/errors/CopycatException.java  |  40 -
 .../kafka/copycat/errors/DataException.java     |  35 -
 .../errors/IllegalWorkerStateException.java     |  35 -
 .../copycat/errors/SchemaBuilderException.java  |  32 -
 .../errors/SchemaProjectorException.java        |  29 -
 .../kafka/copycat/sink/SinkConnector.java       |  40 -
 .../apache/kafka/copycat/sink/SinkRecord.java   |  72 --
 .../org/apache/kafka/copycat/sink/SinkTask.java | 107 ---
 .../kafka/copycat/sink/SinkTaskContext.java     |  82 --
 .../kafka/copycat/source/SourceConnector.java   |  29 -
 .../kafka/copycat/source/SourceRecord.java      | 109 ---
 .../apache/kafka/copycat/source/SourceTask.java |  82 --
 .../kafka/copycat/source/SourceTaskContext.java |  32 -
 .../apache/kafka/copycat/storage/Converter.java |  57 --
 .../copycat/storage/OffsetStorageReader.java    |  65 --
 .../kafka/copycat/storage/StringConverter.java  |  81 --
 .../kafka/copycat/util/ConnectorUtils.java      |  66 --
 .../connector/ConnectorReconfigurationTest.java |  82 --
 .../kafka/copycat/data/CopycatSchemaTest.java   | 303 ------
 .../org/apache/kafka/copycat/data/DateTest.java |  78 --
 .../apache/kafka/copycat/data/DecimalTest.java  |  63 --
 .../apache/kafka/copycat/data/FieldTest.java    |  40 -
 .../kafka/copycat/data/SchemaBuilderTest.java   | 305 ------
 .../kafka/copycat/data/SchemaProjectorTest.java | 495 ----------
 .../apache/kafka/copycat/data/StructTest.java   | 222 -----
 .../org/apache/kafka/copycat/data/TimeTest.java |  80 --
 .../kafka/copycat/data/TimestampTest.java       |  75 --
 .../copycat/storage/StringConverterTest.java    |  83 --
 .../kafka/copycat/util/ConnectorUtilsTest.java  |  67 --
 .../copycat/file/FileStreamSinkConnector.java   |  69 --
 .../kafka/copycat/file/FileStreamSinkTask.java  |  94 --
 .../copycat/file/FileStreamSourceConnector.java |  77 --
 .../copycat/file/FileStreamSourceTask.java      | 216 -----
 .../file/FileStreamSinkConnectorTest.java       |  86 --
 .../copycat/file/FileStreamSinkTaskTest.java    |  69 --
 .../file/FileStreamSourceConnectorTest.java     | 105 ---
 .../copycat/file/FileStreamSourceTaskTest.java  | 150 ---
 .../kafka/copycat/json/JsonConverter.java       | 735 ---------------
 .../kafka/copycat/json/JsonDeserializer.java    |  62 --
 .../apache/kafka/copycat/json/JsonSchema.java   |  82 --
 .../kafka/copycat/json/JsonSerializer.java      |  60 --
 .../kafka/copycat/json/JsonConverterTest.java   | 644 -------------
 .../kafka/copycat/cli/CopycatDistributed.java   |  67 --
 .../kafka/copycat/cli/CopycatStandalone.java    |  98 --
 .../copycat/errors/AlreadyExistsException.java  |  35 -
 .../kafka/copycat/errors/NotFoundException.java |  35 -
 .../copycat/errors/RetriableException.java      |  35 -
 .../kafka/copycat/runtime/ConnectorConfig.java  |  74 --
 .../apache/kafka/copycat/runtime/Copycat.java   |  99 --
 .../apache/kafka/copycat/runtime/Herder.java    | 148 ---
 .../copycat/runtime/HerderConnectorContext.java |  42 -
 .../runtime/SourceTaskOffsetCommitter.java      | 139 ---
 .../kafka/copycat/runtime/TaskConfig.java       |  54 --
 .../apache/kafka/copycat/runtime/Worker.java    | 331 -------
 .../kafka/copycat/runtime/WorkerConfig.java     | 138 ---
 .../kafka/copycat/runtime/WorkerSinkTask.java   | 370 --------
 .../copycat/runtime/WorkerSinkTaskContext.java  | 111 ---
 .../copycat/runtime/WorkerSinkTaskThread.java   | 116 ---
 .../kafka/copycat/runtime/WorkerSourceTask.java | 339 -------
 .../runtime/WorkerSourceTaskContext.java        |  35 -
 .../kafka/copycat/runtime/WorkerTask.java       |  54 --
 .../runtime/distributed/ClusterConfigState.java | 145 ---
 .../runtime/distributed/CopycatProtocol.java    | 269 ------
 .../runtime/distributed/DistributedConfig.java  | 187 ----
 .../runtime/distributed/DistributedHerder.java  | 920 -------------------
 .../runtime/distributed/NotLeaderException.java |  47 -
 .../runtime/distributed/WorkerCoordinator.java  | 293 ------
 .../runtime/distributed/WorkerGroupMember.java  | 185 ----
 .../distributed/WorkerRebalanceListener.java    |  38 -
 .../kafka/copycat/runtime/rest/RestServer.java  | 258 ------
 .../runtime/rest/entities/ConnectorInfo.java    |  81 --
 .../rest/entities/CreateConnectorRequest.java   |  59 --
 .../runtime/rest/entities/ErrorMessage.java     |  63 --
 .../runtime/rest/entities/ServerInfo.java       |  41 -
 .../copycat/runtime/rest/entities/TaskInfo.java |  58 --
 .../rest/errors/CopycatExceptionMapper.java     |  60 --
 .../rest/errors/CopycatRestException.java       |  70 --
 .../rest/resources/ConnectorsResource.java      | 201 ----
 .../runtime/rest/resources/RootResource.java    |  36 -
 .../runtime/standalone/StandaloneConfig.java    |  35 -
 .../runtime/standalone/StandaloneHerder.java    | 272 ------
 .../copycat/storage/FileOffsetBackingStore.java | 102 --
 .../copycat/storage/KafkaConfigStorage.java     | 578 ------------
 .../storage/KafkaOffsetBackingStore.java        | 213 -----
 .../storage/MemoryOffsetBackingStore.java       | 105 ---
 .../copycat/storage/OffsetBackingStore.java     |  72 --
 .../storage/OffsetStorageReaderImpl.java        | 110 ---
 .../copycat/storage/OffsetStorageWriter.java    | 207 -----
 .../kafka/copycat/storage/OffsetUtils.java      |  54 --
 .../org/apache/kafka/copycat/util/Callback.java |  31 -
 .../kafka/copycat/util/ConnectorTaskId.java     |  85 --
 .../copycat/util/ConvertingFutureCallback.java  |  85 --
 .../kafka/copycat/util/FutureCallback.java      |  34 -
 .../kafka/copycat/util/KafkaBasedLog.java       | 331 -------
 .../kafka/copycat/util/ShutdownableThread.java  | 145 ---
 .../copycat/runtime/WorkerSinkTaskTest.java     | 208 -----
 .../runtime/WorkerSinkTaskThreadedTest.java     | 563 ------------
 .../copycat/runtime/WorkerSourceTaskTest.java   | 308 -------
 .../kafka/copycat/runtime/WorkerTest.java       | 397 --------
 .../distributed/DistributedHerderTest.java      | 573 ------------
 .../distributed/WorkerCoordinatorTest.java      | 443 ---------
 .../rest/resources/ConnectorsResourceTest.java  | 364 --------
 .../standalone/StandaloneHerderTest.java        | 337 -------
 .../storage/FileOffsetBackingStoreTest.java     | 117 ---
 .../copycat/storage/KafkaConfigStorageTest.java | 522 -----------
 .../storage/KafkaOffsetBackingStoreTest.java    | 357 -------
 .../storage/OffsetStorageWriterTest.java        | 272 ------
 .../util/ByteArrayProducerRecordEquals.java     |  53 --
 .../kafka/copycat/util/KafkaBasedLogTest.java   | 437 ---------
 .../org/apache/kafka/copycat/util/MockTime.java |  49 -
 .../copycat/util/ShutdownableThreadTest.java    |  72 --
 .../TestBackgroundThreadExceptionHandler.java   |  37 -
 .../apache/kafka/copycat/util/TestFuture.java   | 161 ----
 .../apache/kafka/copycat/util/ThreadedTest.java |  43 -
 .../runtime/src/test/resources/log4j.properties |  23 -
 .../GroupCoordinatorResponseTest.scala          |   2 +-
 docs/security.html                              |   2 +-
 settings.gradle                                 |   2 +-
 tests/kafkatest/services/connect.py             | 191 ++++
 tests/kafkatest/services/copycat.py             | 191 ----
 .../kafkatest/tests/connect_distributed_test.py |  97 ++
 tests/kafkatest/tests/connect_rest_test.py      | 163 ++++
 tests/kafkatest/tests/connect_test.py           |  93 ++
 .../kafkatest/tests/copycat_distributed_test.py |  97 --
 tests/kafkatest/tests/copycat_rest_test.py      | 163 ----
 tests/kafkatest/tests/copycat_test.py           |  93 --
 .../templates/connect-distributed.properties    |  40 +
 .../templates/connect-file-sink.properties      |  20 +
 .../templates/connect-file-source.properties    |  20 +
 .../templates/connect-standalone.properties     |  32 +
 .../templates/copycat-distributed.properties    |  40 -
 .../templates/copycat-file-sink.properties      |  20 -
 .../templates/copycat-file-source.properties    |  20 -
 .../templates/copycat-standalone.properties     |  32 -
 301 files changed, 21527 insertions(+), 21527 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/bin/connect-distributed.sh
----------------------------------------------------------------------
diff --git a/bin/connect-distributed.sh b/bin/connect-distributed.sh
new file mode 100755
index 0000000..d9f6325
--- /dev/null
+++ b/bin/connect-distributed.sh
@@ -0,0 +1,23 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)
+
+if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
+    export 
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/connect-log4j.properties"
+fi
+
+exec $(dirname $0)/kafka-run-class.sh 
org.apache.kafka.connect.cli.ConnectDistributed "$@"

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/bin/connect-standalone.sh
----------------------------------------------------------------------
diff --git a/bin/connect-standalone.sh b/bin/connect-standalone.sh
new file mode 100755
index 0000000..a14df86
--- /dev/null
+++ b/bin/connect-standalone.sh
@@ -0,0 +1,23 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)
+
+if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
+    export 
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/connect-log4j.properties"
+fi
+
+exec $(dirname $0)/kafka-run-class.sh 
org.apache.kafka.connect.cli.ConnectStandalone "$@"

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/bin/copycat-distributed.sh
----------------------------------------------------------------------
diff --git a/bin/copycat-distributed.sh b/bin/copycat-distributed.sh
deleted file mode 100755
index 4d62300..0000000
--- a/bin/copycat-distributed.sh
+++ /dev/null
@@ -1,23 +0,0 @@
-#!/bin/sh
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-base_dir=$(dirname $0)
-
-if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
-    export 
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/copycat-log4j.properties"
-fi
-
-exec $(dirname $0)/kafka-run-class.sh 
org.apache.kafka.copycat.cli.CopycatDistributed "$@"

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/bin/copycat-standalone.sh
----------------------------------------------------------------------
diff --git a/bin/copycat-standalone.sh b/bin/copycat-standalone.sh
deleted file mode 100755
index b219f8a..0000000
--- a/bin/copycat-standalone.sh
+++ /dev/null
@@ -1,23 +0,0 @@
-#!/bin/sh
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-base_dir=$(dirname $0)
-
-if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
-    export 
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/copycat-log4j.properties"
-fi
-
-exec $(dirname $0)/kafka-run-class.sh 
org.apache.kafka.copycat.cli.CopycatStandalone "$@"

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/bin/kafka-run-class.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index cfddae0..b18a9cf 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -74,12 +74,12 @@ done
 
 for cc_pkg in "api" "runtime" "file" "json"
 do
-  for file in $base_dir/copycat/${cc_pkg}/build/libs/copycat-${cc_pkg}*.jar;
+  for file in $base_dir/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;
   do
     CLASSPATH=$CLASSPATH:$file
   done
-  if [ -d "$base_dir/copycat/${cc_pkg}/build/dependant-libs" ] ; then
-    CLASSPATH=$CLASSPATH:$base_dir/copycat/${cc_pkg}/build/dependant-libs/*
+  if [ -d "$base_dir/connect/${cc_pkg}/build/dependant-libs" ] ; then
+    CLASSPATH=$CLASSPATH:$base_dir/connect/${cc_pkg}/build/dependant-libs/*
   fi
 done
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 764f9e6..49733c4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -228,17 +228,17 @@ for ( sv in ['2_10_5', '2_11_7'] ) {
   }
 }
 
-def copycatPkgs = ['copycat:api', 'copycat:runtime', 'copycat:json', 
'copycat:file']
-def pkgs = ['clients', 'examples', 'contrib:hadoop-consumer', 
'contrib:hadoop-producer', 'log4j-appender', 'tools', 'streams'] + copycatPkgs
+def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 
'connect:file']
+def pkgs = ['clients', 'examples', 'contrib:hadoop-consumer', 
'contrib:hadoop-producer', 'log4j-appender', 'tools', 'streams'] + connectPkgs
 
-tasks.create(name: "jarCopycat", dependsOn: copycatPkgs.collect { it + ":jar" 
}) {}
+tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" 
}) {}
 tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7'] 
+ pkgs.collect { it + ":jar" }) { }
 
 tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_10_5', 'srcJar_2_11_7'] 
+ pkgs.collect { it + ":srcJar" }) { }
 
 tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_10_5', 
'docsJar_2_11_7'] + pkgs.collect { it + ":docsJar" }) { }
 
-tasks.create(name: "testCopycat", dependsOn: copycatPkgs.collect { it + 
":test" }) {}
+tasks.create(name: "testConnect", dependsOn: connectPkgs.collect { it + 
":test" }) {}
 tasks.create(name: "testAll", dependsOn: ['test_core_2_10_5', 
'test_core_2_11_7'] + pkgs.collect { it + ":test" }) { }
 
 tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_10_5', 
'releaseTarGz_2_11_7']) {
@@ -675,9 +675,9 @@ project(':log4j-appender') {
   test.dependsOn('checkstyleMain', 'checkstyleTest')
 }
 
-project(':copycat:api') {
+project(':connect:api') {
   apply plugin: 'checkstyle'
-  archivesBaseName = "copycat-api"
+  archivesBaseName = "connect-api"
 
   dependencies {
     compile "$slf4japi"
@@ -700,7 +700,7 @@ project(':copycat:api') {
   }
 
   javadoc {
-    include "**/org/apache/kafka/copycat/*"
+    include "**/org/apache/kafka/connect/*"
   }
 
   tasks.create(name: "copyDependantLibs", type: Copy) {
@@ -709,7 +709,7 @@ project(':copycat:api') {
     }
     from (configurations.runtime) {
       exclude('kafka-clients*')
-      exclude('copycat-*')
+      exclude('connect-*')
     }
     into "$buildDir/dependant-libs"
   }
@@ -732,12 +732,12 @@ project(':copycat:api') {
   test.dependsOn('checkstyleMain', 'checkstyleTest')
 }
 
-project(':copycat:json') {
+project(':connect:json') {
   apply plugin: 'checkstyle'
-  archivesBaseName = "copycat-json"
+  archivesBaseName = "connect-json"
 
   dependencies {
-    compile project(':copycat:api')
+    compile project(':connect:api')
     compile "$slf4japi"
     compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version"
 
@@ -761,7 +761,7 @@ project(':copycat:json') {
   }
 
   javadoc {
-    include "**/org/apache/kafka/copycat/*"
+    include "**/org/apache/kafka/connect/*"
   }
 
   tasks.create(name: "copyDependantLibs", type: Copy) {
@@ -770,7 +770,7 @@ project(':copycat:json') {
     }
     from (configurations.runtime) {
       exclude('kafka-clients*')
-      exclude('copycat-*')
+      exclude('connect-*')
     }
     into "$buildDir/dependant-libs"
   }
@@ -793,12 +793,12 @@ project(':copycat:json') {
   test.dependsOn('checkstyleMain', 'checkstyleTest')
 }
 
-project(':copycat:runtime') {
+project(':connect:runtime') {
   apply plugin: 'checkstyle'
-  archivesBaseName = "copycat-runtime"
+  archivesBaseName = "connect-runtime"
 
   dependencies {
-    compile project(':copycat:api')
+    compile project(':connect:api')
     compile project(':clients')
     compile "$slf4japi"
 
@@ -813,7 +813,7 @@ project(':copycat:runtime') {
     testCompile "$powermock_easymock"
     testCompile project(':clients').sourceSets.test.output
     testRuntime "$slf4jlog4j"
-    testRuntime project(":copycat:json")
+    testRuntime project(":connect:json")
   }
 
   task testJar(type: Jar) {
@@ -829,7 +829,7 @@ project(':copycat:runtime') {
   }
 
   javadoc {
-    include "**/org/apache/kafka/copycat/*"
+    include "**/org/apache/kafka/connect/*"
   }
 
   tasks.create(name: "copyDependantLibs", type: Copy) {
@@ -838,7 +838,7 @@ project(':copycat:runtime') {
     }
     from (configurations.runtime) {
       exclude('kafka-clients*')
-      exclude('copycat-*')
+      exclude('connect-*')
     }
     into "$buildDir/dependant-libs"
   }
@@ -861,12 +861,12 @@ project(':copycat:runtime') {
   test.dependsOn('checkstyleMain', 'checkstyleTest')
 }
 
-project(':copycat:file') {
+project(':connect:file') {
   apply plugin: 'checkstyle'
-  archivesBaseName = "copycat-file"
+  archivesBaseName = "connect-file"
 
   dependencies {
-    compile project(':copycat:api')
+    compile project(':connect:api')
     compile "$slf4japi"
 
     testCompile "$junit"
@@ -889,7 +889,7 @@ project(':copycat:file') {
   }
 
   javadoc {
-    include "**/org/apache/kafka/copycat/*"
+    include "**/org/apache/kafka/connect/*"
   }
 
   tasks.create(name: "copyDependantLibs", type: Copy) {
@@ -898,7 +898,7 @@ project(':copycat:file') {
     }
     from (configurations.runtime) {
       exclude('kafka-clients*')
-      exclude('copycat-*')
+      exclude('connect-*')
     }
     into "$buildDir/dependant-libs"
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 75027f5..95ea3b7 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -141,26 +141,26 @@
     <allow pkg="org.bouncycastle" />
   </subpackage>
 
-  <subpackage name="copycat">
+  <subpackage name="connect">
     <allow pkg="org.apache.kafka.common" />
-    <allow pkg="org.apache.kafka.copycat.data" />
-    <allow pkg="org.apache.kafka.copycat.errors" />
+    <allow pkg="org.apache.kafka.connect.data" />
+    <allow pkg="org.apache.kafka.connect.errors" />
     <allow pkg="org.apache.kafka.clients" />
     <allow pkg="org.apache.kafka.test"/>
 
     <subpackage name="source">
-      <allow pkg="org.apache.kafka.copycat.connector" />
-      <allow pkg="org.apache.kafka.copycat.storage" />
+      <allow pkg="org.apache.kafka.connect.connector" />
+      <allow pkg="org.apache.kafka.connect.storage" />
     </subpackage>
 
     <subpackage name="sink">
       <allow pkg="org.apache.kafka.clients.consumer" />
-      <allow pkg="org.apache.kafka.copycat.connector" />
-      <allow pkg="org.apache.kafka.copycat.storage" />
+      <allow pkg="org.apache.kafka.connect.connector" />
+      <allow pkg="org.apache.kafka.connect.storage" />
     </subpackage>
 
     <subpackage name="runtime">
-      <allow pkg="org.apache.kafka.copycat" />
+      <allow pkg="org.apache.kafka.connect" />
 
       <subpackage name="rest">
         <allow pkg="org.eclipse.jetty" />
@@ -172,19 +172,19 @@
     </subpackage>
 
     <subpackage name="cli">
-      <allow pkg="org.apache.kafka.copycat.runtime" />
-      <allow pkg="org.apache.kafka.copycat.storage" />
-      <allow pkg="org.apache.kafka.copycat.util" />
+      <allow pkg="org.apache.kafka.connect.runtime" />
+      <allow pkg="org.apache.kafka.connect.storage" />
+      <allow pkg="org.apache.kafka.connect.util" />
       <allow pkg="org.apache.kafka.common" />
     </subpackage>
 
     <subpackage name="storage">
-      <allow pkg="org.apache.kafka.copycat" />
+      <allow pkg="org.apache.kafka.connect" />
       <allow pkg="org.apache.kafka.common.serialization" />
     </subpackage>
 
     <subpackage name="util">
-      <allow pkg="org.apache.kafka.copycat" />
+      <allow pkg="org.apache.kafka.connect" />
       <!-- for annotations to avoid code duplication -->
       <allow pkg="com.fasterxml.jackson.annotation" />
     </subpackage>
@@ -193,11 +193,11 @@
       <allow pkg="com.fasterxml.jackson" />
       <allow pkg="org.apache.kafka.common.serialization" />
       <allow pkg="org.apache.kafka.common.errors" />
-      <allow pkg="org.apache.kafka.copycat.storage" />
+      <allow pkg="org.apache.kafka.connect.storage" />
     </subpackage>
 
     <subpackage name="file">
-      <allow pkg="org.apache.kafka.copycat" />
+      <allow pkg="org.apache.kafka.connect" />
       <allow pkg="org.apache.kafka.clients.consumer" />
       <!-- for tests -->
       <allow pkg="org.easymock" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 4d964c9..5b944d0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -126,7 +126,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
     }
 
     /**
-     * Unique identifier for the class of protocols implements (e.g. 
"consumer" or "copycat").
+     * Unique identifier for the class of protocols implements (e.g. 
"consumer" or "connect").
      * @return Non-null protocol type name
      */
     protected abstract String protocolType();

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/connect-console-sink.properties
----------------------------------------------------------------------
diff --git a/config/connect-console-sink.properties 
b/config/connect-console-sink.properties
new file mode 100644
index 0000000..30dbe2f
--- /dev/null
+++ b/config/connect-console-sink.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name=local-console-sink
+connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
+tasks.max=1
+topics=test
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/connect-console-source.properties
----------------------------------------------------------------------
diff --git a/config/connect-console-source.properties 
b/config/connect-console-source.properties
new file mode 100644
index 0000000..d161e4e
--- /dev/null
+++ b/config/connect-console-source.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name=local-console-source
+connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
+tasks.max=1
+topic=test
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/connect-distributed.properties
----------------------------------------------------------------------
diff --git a/config/connect-distributed.properties 
b/config/connect-distributed.properties
new file mode 100644
index 0000000..9ec63db
--- /dev/null
+++ b/config/connect-distributed.properties
@@ -0,0 +1,42 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+# These are defaults. This file just demonstrates how to override some 
settings.
+bootstrap.servers=localhost:9092
+
+group.id=connect-cluster
+
+# The converters specify the format of data in Kafka and how to translate it 
into Connect data. Every Connect user will
+# need to configure these based on the format they want their data in when 
loaded from or stored into Kafka
+key.converter=org.apache.kafka.connect.json.JsonConverter
+value.converter=org.apache.kafka.connect.json.JsonConverter
+# Converter-specific settings can be passed in by prefixing the Converter's 
setting with the converter we want to apply
+# it to
+key.converter.schemas.enable=true
+value.converter.schemas.enable=true
+
+# The internal converter used for offsets and config data is configurable and 
must be specified, but most users will
+# always want to use the built-in default. Offset and config data is never 
visible outside of Copcyat in this format.
+internal.key.converter=org.apache.kafka.connect.json.JsonConverter
+internal.value.converter=org.apache.kafka.connect.json.JsonConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
+
+offset.storage.topic=connect-offsets
+# Flush much faster than normal, which is useful for testing/debugging
+offset.flush.interval.ms=10000
+config.storage.topic=connect-configs
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/connect-file-sink.properties
----------------------------------------------------------------------
diff --git a/config/connect-file-sink.properties 
b/config/connect-file-sink.properties
new file mode 100644
index 0000000..17275b3
--- /dev/null
+++ b/config/connect-file-sink.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name=local-file-sink
+connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
+tasks.max=1
+file=test.sink.txt
+topics=test
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/connect-file-source.properties
----------------------------------------------------------------------
diff --git a/config/connect-file-source.properties 
b/config/connect-file-source.properties
new file mode 100644
index 0000000..31fa96c
--- /dev/null
+++ b/config/connect-file-source.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name=local-file-source
+connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
+tasks.max=1
+file=test.txt
+topic=test
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/connect-log4j.properties
----------------------------------------------------------------------
diff --git a/config/connect-log4j.properties b/config/connect-log4j.properties
new file mode 100644
index 0000000..158daed
--- /dev/null
+++ b/config/connect-log4j.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
+
+log4j.logger.org.apache.zookeeper=ERROR
+log4j.logger.org.I0Itec.zkclient=ERROR
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/connect-standalone.properties
----------------------------------------------------------------------
diff --git a/config/connect-standalone.properties 
b/config/connect-standalone.properties
new file mode 100644
index 0000000..8c4f98e
--- /dev/null
+++ b/config/connect-standalone.properties
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# These are defaults. This file just demonstrates how to override some 
settings.
+bootstrap.servers=localhost:9092
+
+# The converters specify the format of data in Kafka and how to translate it 
into Connect data. Every Connect user will
+# need to configure these based on the format they want their data in when 
loaded from or stored into Kafka
+key.converter=org.apache.kafka.connect.json.JsonConverter
+value.converter=org.apache.kafka.connect.json.JsonConverter
+# Converter-specific settings can be passed in by prefixing the Converter's 
setting with the converter we want to apply
+# it to
+key.converter.schemas.enable=true
+value.converter.schemas.enable=true
+
+# The internal converter used for offsets and config data is configurable and 
must be specified, but most users will
+# always want to use the built-in default. Offset and config data is never 
visible outside of Copcyat in this format.
+internal.key.converter=org.apache.kafka.connect.json.JsonConverter
+internal.value.converter=org.apache.kafka.connect.json.JsonConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
+
+offset.storage.file.filename=/tmp/connect.offsets
+# Flush much faster than normal, which is useful for testing/debugging
+offset.flush.interval.ms=10000

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/copycat-console-sink.properties
----------------------------------------------------------------------
diff --git a/config/copycat-console-sink.properties 
b/config/copycat-console-sink.properties
deleted file mode 100644
index 4cd4c33..0000000
--- a/config/copycat-console-sink.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-name=local-console-sink
-connector.class=org.apache.kafka.copycat.file.FileStreamSinkConnector
-tasks.max=1
-topics=test
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/copycat-console-source.properties
----------------------------------------------------------------------
diff --git a/config/copycat-console-source.properties 
b/config/copycat-console-source.properties
deleted file mode 100644
index 17dbbf9..0000000
--- a/config/copycat-console-source.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-name=local-console-source
-connector.class=org.apache.kafka.copycat.file.FileStreamSourceConnector
-tasks.max=1
-topic=test
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/copycat-distributed.properties
----------------------------------------------------------------------
diff --git a/config/copycat-distributed.properties 
b/config/copycat-distributed.properties
deleted file mode 100644
index 2ea5b73..0000000
--- a/config/copycat-distributed.properties
+++ /dev/null
@@ -1,42 +0,0 @@
-##
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-##
-
-# These are defaults. This file just demonstrates how to override some 
settings.
-bootstrap.servers=localhost:9092
-
-group.id=copycat-cluster
-
-# The converters specify the format of data in Kafka and how to translate it 
into Copycat data. Every Copycat user will
-# need to configure these based on the format they want their data in when 
loaded from or stored into Kafka
-key.converter=org.apache.kafka.copycat.json.JsonConverter
-value.converter=org.apache.kafka.copycat.json.JsonConverter
-# Converter-specific settings can be passed in by prefixing the Converter's 
setting with the converter we want to apply
-# it to
-key.converter.schemas.enable=true
-value.converter.schemas.enable=true
-
-# The internal converter used for offsets and config data is configurable and 
must be specified, but most users will
-# always want to use the built-in default. Offset and config data is never 
visible outside of Copcyat in this format.
-internal.key.converter=org.apache.kafka.copycat.json.JsonConverter
-internal.value.converter=org.apache.kafka.copycat.json.JsonConverter
-internal.key.converter.schemas.enable=false
-internal.value.converter.schemas.enable=false
-
-offset.storage.topic=copycat-offsets
-# Flush much faster than normal, which is useful for testing/debugging
-offset.flush.interval.ms=10000
-config.storage.topic=copycat-configs
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/copycat-file-sink.properties
----------------------------------------------------------------------
diff --git a/config/copycat-file-sink.properties 
b/config/copycat-file-sink.properties
deleted file mode 100644
index 3cc0d62..0000000
--- a/config/copycat-file-sink.properties
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-name=local-file-sink
-connector.class=org.apache.kafka.copycat.file.FileStreamSinkConnector
-tasks.max=1
-file=test.sink.txt
-topics=test
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/copycat-file-source.properties
----------------------------------------------------------------------
diff --git a/config/copycat-file-source.properties 
b/config/copycat-file-source.properties
deleted file mode 100644
index 7512e50..0000000
--- a/config/copycat-file-source.properties
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-name=local-file-source
-connector.class=org.apache.kafka.copycat.file.FileStreamSourceConnector
-tasks.max=1
-file=test.txt
-topic=test
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/copycat-log4j.properties
----------------------------------------------------------------------
diff --git a/config/copycat-log4j.properties b/config/copycat-log4j.properties
deleted file mode 100644
index 158daed..0000000
--- a/config/copycat-log4j.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-log4j.rootLogger=INFO, stdout
-
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
-
-log4j.logger.org.apache.zookeeper=ERROR
-log4j.logger.org.I0Itec.zkclient=ERROR
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/config/copycat-standalone.properties
----------------------------------------------------------------------
diff --git a/config/copycat-standalone.properties 
b/config/copycat-standalone.properties
deleted file mode 100644
index ebf689f..0000000
--- a/config/copycat-standalone.properties
+++ /dev/null
@@ -1,37 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# These are defaults. This file just demonstrates how to override some 
settings.
-bootstrap.servers=localhost:9092
-
-# The converters specify the format of data in Kafka and how to translate it 
into Copycat data. Every Copycat user will
-# need to configure these based on the format they want their data in when 
loaded from or stored into Kafka
-key.converter=org.apache.kafka.copycat.json.JsonConverter
-value.converter=org.apache.kafka.copycat.json.JsonConverter
-# Converter-specific settings can be passed in by prefixing the Converter's 
setting with the converter we want to apply
-# it to
-key.converter.schemas.enable=true
-value.converter.schemas.enable=true
-
-# The internal converter used for offsets and config data is configurable and 
must be specified, but most users will
-# always want to use the built-in default. Offset and config data is never 
visible outside of Copcyat in this format.
-internal.key.converter=org.apache.kafka.copycat.json.JsonConverter
-internal.value.converter=org.apache.kafka.copycat.json.JsonConverter
-internal.key.converter.schemas.enable=false
-internal.value.converter.schemas.enable=false
-
-offset.storage.file.filename=/tmp/copycat.offsets
-# Flush much faster than normal, which is useful for testing/debugging
-offset.flush.interval.ms=10000

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
new file mode 100644
index 0000000..21f0944
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.connector;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.connect.data.Schema;
+
+/**
+ * <p>
+ * Base class for records containing data to be copied to/from Kafka. This 
corresponds closely to
+ * Kafka's ProducerRecord and ConsumerRecord classes, and holds the data that 
may be used by both
+ * sources and sinks (topic, kafkaPartition, key, value). Although both 
implementations include a
+ * notion of offset, it is not included here because they differ in type.
+ * </p>
+ */
+@InterfaceStability.Unstable
+public abstract class ConnectRecord {
+    private final String topic;
+    private final Integer kafkaPartition;
+    private final Schema keySchema;
+    private final Object key;
+    private final Schema valueSchema;
+    private final Object value;
+
+    public ConnectRecord(String topic, Integer kafkaPartition, Schema 
valueSchema, Object value) {
+        this(topic, kafkaPartition, null, null, valueSchema, value);
+    }
+
+    public ConnectRecord(String topic, Integer kafkaPartition, Schema 
keySchema, Object key, Schema valueSchema, Object value) {
+        this.topic = topic;
+        this.kafkaPartition = kafkaPartition;
+        this.keySchema = keySchema;
+        this.key = key;
+        this.valueSchema = valueSchema;
+        this.value = value;
+    }
+
+    public String topic() {
+        return topic;
+    }
+
+    public Integer kafkaPartition() {
+        return kafkaPartition;
+    }
+
+    public Object key() {
+        return key;
+    }
+
+    public Schema keySchema() {
+        return keySchema;
+    }
+
+    public Object value() {
+        return value;
+    }
+
+    public Schema valueSchema() {
+        return valueSchema;
+    }
+
+    @Override
+    public String toString() {
+        return "ConnectRecord{" +
+                "topic='" + topic + '\'' +
+                ", kafkaPartition=" + kafkaPartition +
+                ", key=" + key +
+                ", value=" + value +
+                '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        ConnectRecord that = (ConnectRecord) o;
+
+        if (kafkaPartition != null ? 
!kafkaPartition.equals(that.kafkaPartition) : that.kafkaPartition != null)
+            return false;
+        if (topic != null ? !topic.equals(that.topic) : that.topic != null)
+            return false;
+        if (keySchema != null ? !keySchema.equals(that.keySchema) : 
that.keySchema != null)
+            return false;
+        if (key != null ? !key.equals(that.key) : that.key != null)
+            return false;
+        if (valueSchema != null ? !valueSchema.equals(that.valueSchema) : 
that.valueSchema != null)
+            return false;
+        if (value != null ? !value.equals(that.value) : that.value != null)
+            return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = topic != null ? topic.hashCode() : 0;
+        result = 31 * result + (kafkaPartition != null ? 
kafkaPartition.hashCode() : 0);
+        result = 31 * result + (keySchema != null ? keySchema.hashCode() : 0);
+        result = 31 * result + (key != null ? key.hashCode() : 0);
+        result = 31 * result + (valueSchema != null ? valueSchema.hashCode() : 
0);
+        result = 31 * result + (value != null ? value.hashCode() : 0);
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java 
b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
new file mode 100644
index 0000000..934cdbd
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.connector;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * <p>
+ * Connectors manage integration of Kafka Connect with another system, either 
as an input that ingests
+ * data into Kafka or an output that passes data to an external system. 
Implementations should
+ * not use this class directly; they should inherit from SourceConnector or 
SinkConnector.
+ * </p>
+ * <p>
+ * Connectors have two primary tasks. First, given some configuration, they 
are responsible for
+ * creating configurations for a set of {@link Task}s that split up the data 
processing. For
+ * example, a database Connector might create Tasks by dividing the set of 
tables evenly among
+ * tasks. Second, they are responsible for monitoring inputs for changes that 
require
+ * reconfiguration and notifying the Kafka Connect runtime via the 
ConnectorContext. Continuing the
+ * previous example, the connector might periodically check for new tables and 
notify Kafka Connect of
+ * additions and deletions. Kafka Connect will then request new configurations 
and update the running
+ * Tasks.
+ * </p>
+ */
+@InterfaceStability.Unstable
+public abstract class Connector {
+
+    protected ConnectorContext context;
+
+    /**
+     * Get the version of this connector.
+     *
+     * @return the version, formatted as a String
+     */
+    public abstract String version();
+
+    /**
+     * Initialize this connector, using the provided ConnectorContext to 
notify the runtime of
+     * input configuration changes.
+     * @param ctx context object used to interact with the Kafka Connect 
runtime
+     */
+    public void initialize(ConnectorContext ctx) {
+        context = ctx;
+    }
+
+    /**
+     * <p>
+     * Initialize this connector, using the provided ConnectorContext to 
notify the runtime of
+     * input configuration changes and using the provided set of Task 
configurations.
+     * This version is only used to recover from failures.
+     * </p>
+     * <p>
+     * The default implementation ignores the provided Task configurations. 
During recovery, Kafka Connect will request
+     * an updated set of configurations and update the running Tasks 
appropriately. However, Connectors should
+     * implement special handling of this case if it will avoid unnecessary 
changes to running Tasks.
+     * </p>
+     *
+     * @param ctx context object used to interact with the Kafka Connect 
runtime
+     * @param taskConfigs existing task configurations, which may be used when 
generating new task configs to avoid
+     *                    churn in partition to task assignments
+     */
+    public void initialize(ConnectorContext ctx, List<Map<String, String>> 
taskConfigs) {
+        context = ctx;
+        // Ignore taskConfigs. May result in more churn of tasks during 
recovery if updated configs
+        // are very different, but reduces the difficulty of implementing a 
Connector
+    }
+
+    /**
+     * Start this Connector. This method will only be called on a clean 
Connector, i.e. it has
+     * either just been instantiated and initialized or {@link #stop()} has 
been invoked.
+     *
+     * @param props configuration settings
+     */
+    public abstract void start(Map<String, String> props);
+
+    /**
+     * Reconfigure this Connector. Most implementations will not override 
this, using the default
+     * implementation that calls {@link #stop()} followed by {@link 
#start(Map)}.
+     * Implementations only need to override this if they want to handle this 
process more
+     * efficiently, e.g. without shutting down network connections to the 
external system.
+     *
+     * @param props new configuration settings
+     */
+    public void reconfigure(Map<String, String> props) {
+        stop();
+        start(props);
+    }
+
+    /**
+     * Returns the Task implementation for this Connector.
+     */
+    public abstract Class<? extends Task> taskClass();
+
+    /**
+     * Returns a set of configurations for Tasks based on the current 
configuration,
+     * producing at most count configurations.
+     *
+     * @param maxTasks maximum number of configurations to generate
+     * @return configurations for Tasks
+     */
+    public abstract List<Map<String, String>> taskConfigs(int maxTasks);
+
+    /**
+     * Stop this connector.
+     */
+    public abstract void stop();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java
new file mode 100644
index 0000000..2a06484
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.connector;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * ConnectorContext allows Connectors to proactively interact with the Kafka 
Connect runtime.
+ */
+@InterfaceStability.Unstable
+public interface ConnectorContext {
+    /**
+     * Requests that the runtime reconfigure the Tasks for this source. This 
should be used to
+     * indicate to the runtime that something about the input/output has 
changed (e.g. partitions
+     * added/removed) and the running Tasks will need to be modified.
+     */
+    void requestTaskReconfiguration();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/connector/Task.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/connector/Task.java 
b/connect/api/src/main/java/org/apache/kafka/connect/connector/Task.java
new file mode 100644
index 0000000..850954d
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/Task.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.connector;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * <p>
+ * Tasks contain the code that actually copies data to/from another system. 
They receive
+ * a configuration from their parent Connector, assigning them a fraction of a 
Kafka Connect job's work.
+ * The Kafka Connect framework then pushes/pulls data from the Task. The Task 
must also be able to
+ * respond to reconfiguration requests.
+ * </p>
+ * <p>
+ * Task only contains the minimal shared functionality between
+ * {@link org.apache.kafka.connect.source.SourceTask} and
+ * {@link org.apache.kafka.connect.sink.SinkTask}.
+ * </p>
+ */
+@InterfaceStability.Unstable
+public interface Task {
+    /**
+     * Get the version of this task. Usually this should be the same as the 
corresponding {@link Connector} class's version.
+     *
+     * @return the version, formatted as a String
+     */
+    String version();
+
+    /**
+     * Start the Task
+     * @param props initial configuration
+     */
+    void start(Map<String, String> props);
+
+    /**
+     * Stop this task.
+     */
+    void stop();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
new file mode 100644
index 0000000..cb31ca0
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.errors.DataException;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class ConnectSchema implements Schema {
+    /**
+     * Maps Schema.Types to a list of Java classes that can be used to 
represent them.
+     */
+    private static final Map<Type, List<Class>> SCHEMA_TYPE_CLASSES = new 
HashMap<>();
+    /**
+     * Maps known logical types to a list of Java classes that can be used to 
represent them.
+     */
+    private static final Map<String, List<Class>> LOGICAL_TYPE_CLASSES = new 
HashMap<>();
+
+    /**
+     * Maps the Java classes to the corresponding Schema.Type.
+     */
+    private static final Map<Class<?>, Type> JAVA_CLASS_SCHEMA_TYPES = new 
HashMap<>();
+
+    static {
+        SCHEMA_TYPE_CLASSES.put(Type.INT8, Arrays.asList((Class) Byte.class));
+        SCHEMA_TYPE_CLASSES.put(Type.INT16, Arrays.asList((Class) 
Short.class));
+        SCHEMA_TYPE_CLASSES.put(Type.INT32, Arrays.asList((Class) 
Integer.class));
+        SCHEMA_TYPE_CLASSES.put(Type.INT64, Arrays.asList((Class) Long.class));
+        SCHEMA_TYPE_CLASSES.put(Type.FLOAT32, Arrays.asList((Class) 
Float.class));
+        SCHEMA_TYPE_CLASSES.put(Type.FLOAT64, Arrays.asList((Class) 
Double.class));
+        SCHEMA_TYPE_CLASSES.put(Type.BOOLEAN, Arrays.asList((Class) 
Boolean.class));
+        SCHEMA_TYPE_CLASSES.put(Type.STRING, Arrays.asList((Class) 
String.class));
+        // Bytes are special and have 2 representations. byte[] causes 
problems because it doesn't handle equals() and
+        // hashCode() like we want objects to, so we support both byte[] and 
ByteBuffer. Using plain byte[] can cause
+        // those methods to fail, so ByteBuffers are recommended
+        SCHEMA_TYPE_CLASSES.put(Type.BYTES, Arrays.asList((Class) 
byte[].class, (Class) ByteBuffer.class));
+        SCHEMA_TYPE_CLASSES.put(Type.ARRAY, Arrays.asList((Class) List.class));
+        SCHEMA_TYPE_CLASSES.put(Type.MAP, Arrays.asList((Class) Map.class));
+        SCHEMA_TYPE_CLASSES.put(Type.STRUCT, Arrays.asList((Class) 
Struct.class));
+
+        for (Map.Entry<Type, List<Class>> schemaClasses : 
SCHEMA_TYPE_CLASSES.entrySet()) {
+            for (Class<?> schemaClass : schemaClasses.getValue())
+                JAVA_CLASS_SCHEMA_TYPES.put(schemaClass, 
schemaClasses.getKey());
+        }
+
+        LOGICAL_TYPE_CLASSES.put(Decimal.LOGICAL_NAME, Arrays.asList((Class) 
BigDecimal.class));
+        LOGICAL_TYPE_CLASSES.put(Date.LOGICAL_NAME, Arrays.asList((Class) 
java.util.Date.class));
+        LOGICAL_TYPE_CLASSES.put(Time.LOGICAL_NAME, Arrays.asList((Class) 
java.util.Date.class));
+        LOGICAL_TYPE_CLASSES.put(Timestamp.LOGICAL_NAME, Arrays.asList((Class) 
java.util.Date.class));
+        // We don't need to put these into JAVA_CLASS_SCHEMA_TYPES since 
that's only used to determine schemas for
+        // schemaless data and logical types will have ambiguous schemas (e.g. 
many of them use the same Java class) so
+        // they should not be used without schemas.
+    }
+
+    // The type of the field
+    private final Type type;
+    private final boolean optional;
+    private final Object defaultValue;
+
+    private final List<Field> fields;
+    private final Map<String, Field> fieldsByName;
+
+    private final Schema keySchema;
+    private final Schema valueSchema;
+
+    // Optional name and version provide a built-in way to indicate what type 
of data is included. Most
+    // useful for structs to indicate the semantics of the struct and map it 
to some existing underlying
+    // serializer-specific schema. However, can also be useful in specifying 
other logical types (e.g. a set is an array
+    // with additional constraints).
+    private final String name;
+    private final Integer version;
+    // Optional human readable documentation describing this schema.
+    private final String doc;
+    private final Map<String, String> parameters;
+
+    /**
+     * Construct a Schema. Most users should not construct schemas manually, 
preferring {@link SchemaBuilder} instead.
+     */
+    public ConnectSchema(Type type, boolean optional, Object defaultValue, 
String name, Integer version, String doc, Map<String, String> parameters, 
List<Field> fields, Schema keySchema, Schema valueSchema) {
+        this.type = type;
+        this.optional = optional;
+        this.defaultValue = defaultValue;
+        this.name = name;
+        this.version = version;
+        this.doc = doc;
+        this.parameters = parameters;
+
+        this.fields = fields;
+        if (this.fields != null && this.type == Type.STRUCT) {
+            this.fieldsByName = new HashMap<>();
+            for (Field field : fields)
+                fieldsByName.put(field.name(), field);
+        } else {
+            this.fieldsByName = null;
+        }
+
+        this.keySchema = keySchema;
+        this.valueSchema = valueSchema;
+    }
+
+    /**
+     * Construct a Schema for a primitive type, setting schema parameters, 
struct fields, and key and value schemas to null.
+     */
+    public ConnectSchema(Type type, boolean optional, Object defaultValue, 
String name, Integer version, String doc) {
+        this(type, optional, defaultValue, name, version, doc, null, null, 
null, null);
+    }
+
+    /**
+     * Construct a default schema for a primitive type. The schema is 
required, has no default value, name, version,
+     * or documentation.
+     */
+    public ConnectSchema(Type type) {
+        this(type, false, null, null, null, null);
+    }
+
+    @Override
+    public Type type() {
+        return type;
+    }
+
+    @Override
+    public boolean isOptional() {
+        return optional;
+    }
+
+    @Override
+    public Object defaultValue() {
+        return defaultValue;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public Integer version() {
+        return version;
+    }
+
+    @Override
+    public String doc() {
+        return doc;
+    }
+
+    @Override
+    public Map<String, String> parameters() {
+        return parameters;
+    }
+
+    @Override
+    public List<Field> fields() {
+        if (type != Type.STRUCT)
+            throw new DataException("Cannot list fields on non-struct type");
+        return fields;
+    }
+
+    public Field field(String fieldName) {
+        if (type != Type.STRUCT)
+            throw new DataException("Cannot look up fields on non-struct 
type");
+        return fieldsByName.get(fieldName);
+    }
+
+    @Override
+    public Schema keySchema() {
+        if (type != Type.MAP)
+            throw new DataException("Cannot look up key schema on non-map 
type");
+        return keySchema;
+    }
+
+    @Override
+    public Schema valueSchema() {
+        if (type != Type.MAP && type != Type.ARRAY)
+            throw new DataException("Cannot look up value schema on non-array 
and non-map type");
+        return valueSchema;
+    }
+
+
+
+    /**
+     * Validate that the value can be used with the schema, i.e. that its type 
matches the schema type and nullability
+     * requirements. Throws a DataException if the value is invalid.
+     * @param schema Schema to test
+     * @param value value to test
+     */
+    public static void validateValue(Schema schema, Object value) {
+        if (value == null) {
+            if (!schema.isOptional())
+                throw new DataException("Invalid value: null used for required 
field");
+            else
+                return;
+        }
+
+        List<Class> expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name());
+
+        if (expectedClasses == null)
+                expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type());
+
+        if (expectedClasses == null)
+            throw new DataException("Invalid Java object for schema type " + 
schema.type() + ": " + value.getClass());
+
+        boolean foundMatch = false;
+        for (Class<?> expectedClass : expectedClasses) {
+            if (expectedClass.isInstance(value)) {
+                foundMatch = true;
+                break;
+            }
+        }
+        if (!foundMatch)
+            throw new DataException("Invalid Java object for schema type " + 
schema.type() + ": " + value.getClass());
+
+        switch (schema.type()) {
+            case STRUCT:
+                Struct struct = (Struct) value;
+                if (!struct.schema().equals(schema))
+                    throw new DataException("Struct schemas do not match.");
+                struct.validate();
+                break;
+            case ARRAY:
+                List<?> array = (List<?>) value;
+                for (Object entry : array)
+                    validateValue(schema.valueSchema(), entry);
+                break;
+            case MAP:
+                Map<?, ?> map = (Map<?, ?>) value;
+                for (Map.Entry<?, ?> entry : map.entrySet()) {
+                    validateValue(schema.keySchema(), entry.getKey());
+                    validateValue(schema.valueSchema(), entry.getValue());
+                }
+                break;
+        }
+    }
+
+    /**
+     * Validate that the value can be used for this schema, i.e. that its type 
matches the schema type and optional
+     * requirements. Throws a DataException if the value is invalid.
+     * @param value the value to validate
+     */
+    public void validateValue(Object value) {
+        validateValue(this, value);
+    }
+
+    @Override
+    public ConnectSchema schema() {
+        return this;
+    }
+
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ConnectSchema schema = (ConnectSchema) o;
+        return Objects.equals(optional, schema.optional) &&
+                Objects.equals(type, schema.type) &&
+                Objects.equals(defaultValue, schema.defaultValue) &&
+                Objects.equals(fields, schema.fields) &&
+                Objects.equals(keySchema, schema.keySchema) &&
+                Objects.equals(valueSchema, schema.valueSchema) &&
+                Objects.equals(name, schema.name) &&
+                Objects.equals(version, schema.version) &&
+                Objects.equals(doc, schema.doc) &&
+                Objects.equals(parameters, schema.parameters);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(type, optional, defaultValue, fields, keySchema, 
valueSchema, name, version, doc, parameters);
+    }
+
+    @Override
+    public String toString() {
+        if (name != null)
+            return "Schema{" + name + ":" + type + "}";
+        else
+            return "Schema{" + type + "}";
+    }
+
+
+    /**
+     * Get the {@link Type} associated with the the given class.
+     *
+     * @param klass the Class to
+     * @return the corresponding type, nor null if there is no matching type
+     */
+    public static Type schemaType(Class<?> klass) {
+        synchronized (JAVA_CLASS_SCHEMA_TYPES) {
+            Type schemaType = JAVA_CLASS_SCHEMA_TYPES.get(klass);
+            if (schemaType != null)
+                return schemaType;
+
+            // Since the lookup only checks the class, we need to also try
+            for (Map.Entry<Class<?>, Type> entry : 
JAVA_CLASS_SCHEMA_TYPES.entrySet()) {
+                try {
+                    klass.asSubclass(entry.getKey());
+                    // Cache this for subsequent lookups
+                    JAVA_CLASS_SCHEMA_TYPES.put(klass, entry.getValue());
+                    return entry.getValue();
+                } catch (ClassCastException e) {
+                    // Expected, ignore
+                }
+            }
+        }
+        return null;
+    }
+}

Reply via email to