http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java ---------------------------------------------------------------------- diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java new file mode 100644 index 0000000..e41364e --- /dev/null +++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.file; + +import org.apache.kafka.copycat.connector.Task; +import org.apache.kafka.copycat.sink.SinkConnector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * Very simple connector that works with the console. This connector supports both source and + * sink modes via its 'mode' setting. + */ +public class FileStreamSinkConnector extends SinkConnector { + public static final String FILE_CONFIG = "file"; + + private String filename; + + @Override + public void start(Properties props) { + filename = props.getProperty(FILE_CONFIG); + } + + @Override + public Class<? extends Task> getTaskClass() { + return FileStreamSinkTask.class; + } + + @Override + public List<Properties> getTaskConfigs(int maxTasks) { + ArrayList<Properties> configs = new ArrayList<>(); + for (int i = 0; i < maxTasks; i++) { + Properties config = new Properties(); + if (filename != null) + config.setProperty(FILE_CONFIG, filename); + configs.add(config); + } + return configs; + } + + @Override + public void stop() { + // Nothing to do since FileStreamSinkConnector has no background monitoring. + } +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java ---------------------------------------------------------------------- diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java new file mode 100644 index 0000000..7e4ca7e --- /dev/null +++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.file; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.copycat.errors.CopycatException; +import org.apache.kafka.copycat.sink.SinkRecord; +import org.apache.kafka.copycat.sink.SinkTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.PrintStream; +import java.util.Collection; +import java.util.Map; +import java.util.Properties; + +/** + * FileStreamSinkTask writes records to stdout or a file. + */ +public class FileStreamSinkTask extends SinkTask { + private static final Logger log = LoggerFactory.getLogger(FileStreamSinkTask.class); + + private PrintStream outputStream; + + public FileStreamSinkTask() { + } + + // for testing + public FileStreamSinkTask(PrintStream outputStream) { + this.outputStream = outputStream; + } + + @Override + public void start(Properties props) { + String filename = props.getProperty(FileStreamSinkConnector.FILE_CONFIG); + if (filename == null) { + outputStream = System.out; + } else { + try { + outputStream = new PrintStream(new File(filename)); + } catch (FileNotFoundException e) { + throw new CopycatException("Couldn't find or create file for FileStreamSinkTask: {}", e); + } + } + } + + @Override + public void put(Collection<SinkRecord> sinkRecords) { + for (SinkRecord record : sinkRecords) { + outputStream.println(record.getValue()); + } + } + + @Override + public void flush(Map<TopicPartition, Long> offsets) { + outputStream.flush(); + } + + @Override + public void stop() { + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java ---------------------------------------------------------------------- diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java new file mode 100644 index 0000000..4f9d8d0 --- /dev/null +++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.file; + +import org.apache.kafka.copycat.connector.Task; +import org.apache.kafka.copycat.errors.CopycatException; +import org.apache.kafka.copycat.source.SourceConnector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * Very simple connector that works with the console. This connector supports both source and + * sink modes via its 'mode' setting. + */ +public class FileStreamSourceConnector extends SourceConnector { + public static final String TOPIC_CONFIG = "topic"; + public static final String FILE_CONFIG = "file"; + + private String filename; + private String topic; + + @Override + public void start(Properties props) { + filename = props.getProperty(FILE_CONFIG); + topic = props.getProperty(TOPIC_CONFIG); + if (topic == null || topic.isEmpty()) + throw new CopycatException("FileStreamSourceConnector configuration must include 'topic' setting"); + if (topic.contains(",")) + throw new CopycatException("FileStreamSourceConnector should only have a single topic when used as a source."); + } + + @Override + public Class<? extends Task> getTaskClass() { + return FileStreamSourceTask.class; + } + + @Override + public List<Properties> getTaskConfigs(int maxTasks) { + ArrayList<Properties> configs = new ArrayList<>(); + // Only one input stream makes sense. + Properties config = new Properties(); + if (filename != null) + config.setProperty(FILE_CONFIG, filename); + config.setProperty(TOPIC_CONFIG, topic); + configs.add(config); + return configs; + } + + @Override + public void stop() { + // Nothing to do since FileStreamSourceConnector has no background monitoring. + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java ---------------------------------------------------------------------- diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java new file mode 100644 index 0000000..572ae1f --- /dev/null +++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.file; + +import org.apache.kafka.copycat.errors.CopycatException; +import org.apache.kafka.copycat.source.SourceRecord; +import org.apache.kafka.copycat.source.SourceTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * FileStreamSourceTask reads from stdin or a file. + */ +public class FileStreamSourceTask extends SourceTask { + private static final Logger log = LoggerFactory.getLogger(FileStreamSourceTask.class); + + private InputStream stream; + private BufferedReader reader = null; + private char[] buffer = new char[1024]; + private int offset = 0; + private String topic = null; + + private Long streamOffset; + + @Override + public void start(Properties props) { + String filename = props.getProperty(FileStreamSourceConnector.FILE_CONFIG); + if (filename == null) { + stream = System.in; + // Tracking offset for stdin doesn't make sense + streamOffset = null; + } else { + try { + stream = new FileInputStream(filename); + Long lastRecordedOffset = (Long) context.getOffsetStorageReader().getOffset(null); + if (lastRecordedOffset != null) { + log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset); + long skipLeft = lastRecordedOffset; + while (skipLeft > 0) { + try { + long skipped = stream.skip(skipLeft); + skipLeft -= skipped; + } catch (IOException e) { + log.error("Error while trying to seek to previous offset in file: ", e); + throw new CopycatException(e); + } + } + log.debug("Skipped to offset {}", lastRecordedOffset); + } + streamOffset = (lastRecordedOffset != null) ? lastRecordedOffset : 0L; + } catch (FileNotFoundException e) { + throw new CopycatException("Couldn't find file for FileStreamSourceTask: {}", e); + } + } + topic = props.getProperty(FileStreamSourceConnector.TOPIC_CONFIG); + if (topic == null) + throw new CopycatException("ConsoleSourceTask config missing topic setting"); + reader = new BufferedReader(new InputStreamReader(stream)); + } + + @Override + public List<SourceRecord> poll() throws InterruptedException { + // Unfortunately we can't just use readLine() because it blocks in an uninterruptible way. + // Instead we have to manage splitting lines ourselves, using simple backoff when no new data + // is available. + try { + final BufferedReader readerCopy; + synchronized (this) { + readerCopy = reader; + } + if (readerCopy == null) + return null; + + ArrayList<SourceRecord> records = null; + + int nread = 0; + while (readerCopy.ready()) { + nread = readerCopy.read(buffer, offset, buffer.length - offset); + + if (nread > 0) { + offset += nread; + if (offset == buffer.length) { + char[] newbuf = new char[buffer.length * 2]; + System.arraycopy(buffer, 0, newbuf, 0, buffer.length); + buffer = newbuf; + } + + String line; + do { + line = extractLine(); + if (line != null) { + if (records == null) + records = new ArrayList<>(); + records.add(new SourceRecord(null, streamOffset, topic, line)); + } + new ArrayList<SourceRecord>(); + } while (line != null); + } + } + + if (nread <= 0) + Thread.sleep(1); + + return records; + } catch (IOException e) { + // Underlying stream was killed, probably as a result of calling stop. Allow to return + // null, and driving thread will handle any shutdown if necessary. + } + return null; + } + + private String extractLine() { + int until = -1, newStart = -1; + for (int i = 0; i < offset; i++) { + if (buffer[i] == '\n') { + until = i; + newStart = i + 1; + break; + } else if (buffer[i] == '\r') { + // We need to check for \r\n, so we must skip this if we can't check the next char + if (i + 1 >= offset) + return null; + + until = i; + newStart = (buffer[i + 1] == '\n') ? i + 2 : i + 1; + break; + } + } + + if (until != -1) { + String result = new String(buffer, 0, until); + System.arraycopy(buffer, newStart, buffer, 0, buffer.length - newStart); + offset = offset - newStart; + if (streamOffset != null) + streamOffset += newStart; + return result; + } else { + return null; + } + } + + @Override + public void stop() { + log.trace("Stopping"); + synchronized (this) { + try { + stream.close(); + log.trace("Closed input stream"); + } catch (IOException e) { + log.error("Failed to close ConsoleSourceTask stream: ", e); + } + reader = null; + stream = null; + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java ---------------------------------------------------------------------- diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java new file mode 100644 index 0000000..643fb43 --- /dev/null +++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.file; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.copycat.connector.ConnectorContext; +import org.apache.kafka.copycat.sink.SinkConnector; +import org.junit.Before; +import org.junit.Test; +import org.powermock.api.easymock.PowerMock; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + +public class FileStreamSinkConnectorTest { + + private static final String MULTIPLE_TOPICS = "test1,test2"; + private static final String[] MULTIPLE_TOPICS_LIST + = MULTIPLE_TOPICS.split(","); + private static final List<TopicPartition> MULTIPLE_TOPICS_PARTITIONS = Arrays.asList( + new TopicPartition("test1", 1), new TopicPartition("test2", 2) + ); + private static final String FILENAME = "/afilename"; + + private FileStreamSinkConnector connector; + private ConnectorContext ctx; + private Properties sinkProperties; + + @Before + public void setup() { + connector = new FileStreamSinkConnector(); + ctx = PowerMock.createMock(ConnectorContext.class); + connector.initialize(ctx); + + sinkProperties = new Properties(); + sinkProperties.setProperty(SinkConnector.TOPICS_CONFIG, MULTIPLE_TOPICS); + sinkProperties.setProperty(FileStreamSinkConnector.FILE_CONFIG, FILENAME); + } + + @Test + public void testSinkTasks() { + PowerMock.replayAll(); + + connector.start(sinkProperties); + List<Properties> taskConfigs = connector.getTaskConfigs(1); + assertEquals(1, taskConfigs.size()); + assertEquals(FILENAME, taskConfigs.get(0).getProperty(FileStreamSinkConnector.FILE_CONFIG)); + + taskConfigs = connector.getTaskConfigs(2); + assertEquals(2, taskConfigs.size()); + for (int i = 0; i < 2; i++) { + assertEquals(FILENAME, taskConfigs.get(0).getProperty(FileStreamSinkConnector.FILE_CONFIG)); + } + + PowerMock.verifyAll(); + } + + @Test + public void testTaskClass() { + PowerMock.replayAll(); + + connector.start(sinkProperties); + assertEquals(FileStreamSinkTask.class, connector.getTaskClass()); + + PowerMock.verifyAll(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java new file mode 100644 index 0000000..b4e1b0c --- /dev/null +++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.file; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.copycat.sink.SinkRecord; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.Arrays; +import java.util.HashMap; + +import static org.junit.Assert.assertEquals; + +public class FileStreamSinkTaskTest { + + private FileStreamSinkTask task; + private ByteArrayOutputStream os; + private PrintStream printStream; + + @Before + public void setup() { + os = new ByteArrayOutputStream(); + printStream = new PrintStream(os); + task = new FileStreamSinkTask(printStream); + } + + @Test + public void testPutFlush() { + HashMap<TopicPartition, Long> offsets = new HashMap<>(); + + // We do not call task.start() since it would override the output stream + + task.put(Arrays.asList( + new SinkRecord("topic1", 0, null, "line1", 1) + )); + offsets.put(new TopicPartition("topic1", 0), 1L); + task.flush(offsets); + assertEquals("line1\n", os.toString()); + + task.put(Arrays.asList( + new SinkRecord("topic1", 0, null, "line2", 2), + new SinkRecord("topic2", 0, null, "line3", 1) + )); + offsets.put(new TopicPartition("topic1", 0), 2L); + offsets.put(new TopicPartition("topic2", 0), 1L); + task.flush(offsets); + assertEquals("line1\nline2\nline3\n", os.toString()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java ---------------------------------------------------------------------- diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java new file mode 100644 index 0000000..e23055c --- /dev/null +++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.file; + +import org.apache.kafka.copycat.connector.ConnectorContext; +import org.apache.kafka.copycat.errors.CopycatException; +import org.junit.Before; +import org.junit.Test; +import org.powermock.api.easymock.PowerMock; + +import java.util.List; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class FileStreamSourceConnectorTest { + + private static final String SINGLE_TOPIC = "test"; + private static final String MULTIPLE_TOPICS = "test1,test2"; + private static final String FILENAME = "/somefilename"; + + private FileStreamSourceConnector connector; + private ConnectorContext ctx; + private Properties sourceProperties; + + @Before + public void setup() { + connector = new FileStreamSourceConnector(); + ctx = PowerMock.createMock(ConnectorContext.class); + connector.initialize(ctx); + + sourceProperties = new Properties(); + sourceProperties.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, SINGLE_TOPIC); + sourceProperties.setProperty(FileStreamSourceConnector.FILE_CONFIG, FILENAME); + } + + @Test + public void testSourceTasks() { + PowerMock.replayAll(); + + connector.start(sourceProperties); + List<Properties> taskConfigs = connector.getTaskConfigs(1); + assertEquals(1, taskConfigs.size()); + assertEquals(FILENAME, + taskConfigs.get(0).getProperty(FileStreamSourceConnector.FILE_CONFIG)); + assertEquals(SINGLE_TOPIC, + taskConfigs.get(0).getProperty(FileStreamSourceConnector.TOPIC_CONFIG)); + + // Should be able to return fewer than requested # + taskConfigs = connector.getTaskConfigs(2); + assertEquals(1, taskConfigs.size()); + assertEquals(FILENAME, + taskConfigs.get(0).getProperty(FileStreamSourceConnector.FILE_CONFIG)); + assertEquals(SINGLE_TOPIC, + taskConfigs.get(0).getProperty(FileStreamSourceConnector.TOPIC_CONFIG)); + + PowerMock.verifyAll(); + } + + @Test + public void testSourceTasksStdin() { + PowerMock.replayAll(); + + sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG); + connector.start(sourceProperties); + List<Properties> taskConfigs = connector.getTaskConfigs(1); + assertEquals(1, taskConfigs.size()); + assertNull(taskConfigs.get(0).getProperty(FileStreamSourceConnector.FILE_CONFIG)); + + PowerMock.verifyAll(); + } + + @Test(expected = CopycatException.class) + public void testMultipleSourcesInvalid() { + sourceProperties.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, MULTIPLE_TOPICS); + connector.start(sourceProperties); + } + + @Test + public void testTaskClass() { + PowerMock.replayAll(); + + connector.start(sourceProperties); + assertEquals(FileStreamSourceTask.class, connector.getTaskClass()); + + PowerMock.verifyAll(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java new file mode 100644 index 0000000..0ec71d3 --- /dev/null +++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.file; + +import org.apache.kafka.copycat.errors.CopycatException; +import org.apache.kafka.copycat.source.SourceRecord; +import org.apache.kafka.copycat.source.SourceTaskContext; +import org.apache.kafka.copycat.storage.OffsetStorageReader; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.powermock.api.easymock.PowerMock; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + +public class FileStreamSourceTaskTest { + + private static final String TOPIC = "test"; + + private File tempFile; + private Properties config; + private OffsetStorageReader offsetStorageReader; + private FileStreamSourceTask task; + + private boolean verifyMocks = false; + + @Before + public void setup() throws IOException { + tempFile = File.createTempFile("file-stream-source-task-test", null); + config = new Properties(); + config.setProperty(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsolutePath()); + config.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC); + task = new FileStreamSourceTask(); + offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class); + task.initialize(new SourceTaskContext(offsetStorageReader)); + } + + @After + public void teardown() { + tempFile.delete(); + + if (verifyMocks) + PowerMock.verifyAll(); + } + + private void replay() { + PowerMock.replayAll(); + verifyMocks = true; + } + + @Test + public void testNormalLifecycle() throws InterruptedException, IOException { + expectOffsetLookupReturnNone(); + replay(); + + task.start(config); + + FileOutputStream os = new FileOutputStream(tempFile); + assertEquals(null, task.poll()); + os.write("partial line".getBytes()); + os.flush(); + assertEquals(null, task.poll()); + os.write(" finished\n".getBytes()); + os.flush(); + List<SourceRecord> records = task.poll(); + assertEquals(1, records.size()); + assertEquals(TOPIC, records.get(0).getTopic()); + assertEquals("partial line finished", records.get(0).getValue()); + assertEquals(22L, records.get(0).getSourceOffset()); + assertEquals(null, task.poll()); + + // Different line endings, and make sure the final \r doesn't result in a line until we can + // read the subsequent byte. + os.write("line1\rline2\r\nline3\nline4\n\r".getBytes()); + os.flush(); + records = task.poll(); + assertEquals(4, records.size()); + assertEquals("line1", records.get(0).getValue()); + assertEquals(28L, records.get(0).getSourceOffset()); + assertEquals("line2", records.get(1).getValue()); + assertEquals(35L, records.get(1).getSourceOffset()); + assertEquals("line3", records.get(2).getValue()); + assertEquals(41L, records.get(2).getSourceOffset()); + assertEquals("line4", records.get(3).getValue()); + assertEquals(47L, records.get(3).getSourceOffset()); + + os.write("subsequent text".getBytes()); + os.flush(); + records = task.poll(); + assertEquals(1, records.size()); + assertEquals("", records.get(0).getValue()); + assertEquals(48L, records.get(0).getSourceOffset()); + + task.stop(); + } + + @Test(expected = CopycatException.class) + public void testMissingTopic() { + expectOffsetLookupReturnNone(); + replay(); + + config.remove(FileStreamSourceConnector.TOPIC_CONFIG); + task.start(config); + } + + @Test(expected = CopycatException.class) + public void testInvalidFile() { + config.setProperty(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename"); + task.start(config); + } + + + private void expectOffsetLookupReturnNone() { + EasyMock.expect( + offsetStorageReader.getOffset(EasyMock.anyObject(Object.class))) + .andReturn(null); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java ---------------------------------------------------------------------- diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java new file mode 100644 index 0000000..36a6ca8 --- /dev/null +++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.json; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.kafka.copycat.data.*; +import org.apache.kafka.copycat.errors.CopycatException; +import org.apache.kafka.copycat.storage.Converter; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +/** + * Implementation of Converter that uses JSON to store schemas and objects. + */ +public class JsonConverter implements Converter<JsonNode> { + + private static final HashMap<String, JsonToCopycatTypeConverter> TO_COPYCAT_CONVERTERS + = new HashMap<>(); + + static { + TO_COPYCAT_CONVERTERS.put(JsonSchema.BOOLEAN_TYPE_NAME, new JsonToCopycatTypeConverter() { + @Override + public Object convert(JsonNode jsonSchema, JsonNode value) { + return value.booleanValue(); + } + }); + TO_COPYCAT_CONVERTERS.put(JsonSchema.INT_TYPE_NAME, new JsonToCopycatTypeConverter() { + @Override + public Object convert(JsonNode jsonSchema, JsonNode value) { + return value.intValue(); + } + }); + TO_COPYCAT_CONVERTERS.put(JsonSchema.LONG_TYPE_NAME, new JsonToCopycatTypeConverter() { + @Override + public Object convert(JsonNode jsonSchema, JsonNode value) { + return value.longValue(); + } + }); + TO_COPYCAT_CONVERTERS.put(JsonSchema.FLOAT_TYPE_NAME, new JsonToCopycatTypeConverter() { + @Override + public Object convert(JsonNode jsonSchema, JsonNode value) { + return value.floatValue(); + } + }); + TO_COPYCAT_CONVERTERS.put(JsonSchema.DOUBLE_TYPE_NAME, new JsonToCopycatTypeConverter() { + @Override + public Object convert(JsonNode jsonSchema, JsonNode value) { + return value.doubleValue(); + } + }); + TO_COPYCAT_CONVERTERS.put(JsonSchema.BYTES_TYPE_NAME, new JsonToCopycatTypeConverter() { + @Override + public Object convert(JsonNode jsonSchema, JsonNode value) { + try { + return value.binaryValue(); + } catch (IOException e) { + throw new CopycatException("Invalid bytes field", e); + } + } + }); + TO_COPYCAT_CONVERTERS.put(JsonSchema.STRING_TYPE_NAME, new JsonToCopycatTypeConverter() { + @Override + public Object convert(JsonNode jsonSchema, JsonNode value) { + return value.textValue(); + } + }); + TO_COPYCAT_CONVERTERS.put(JsonSchema.ARRAY_TYPE_NAME, new JsonToCopycatTypeConverter() { + @Override + public Object convert(JsonNode jsonSchema, JsonNode value) { + JsonNode elemSchema = jsonSchema.get(JsonSchema.ARRAY_ITEMS_FIELD_NAME); + if (elemSchema == null) + throw new CopycatException("Array schema did not specify the element type"); + ArrayList<Object> result = new ArrayList<>(); + for (JsonNode elem : value) { + result.add(convertToCopycat(elemSchema, elem)); + } + return result; + } + }); + + } + + @Override + public JsonNode fromCopycatData(Object value) { + return convertToJsonWithSchemaEnvelope(value); + } + + @Override + public Object toCopycatData(JsonNode value) { + if (!value.isObject() || value.size() != 2 || !value.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !value.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)) + throw new CopycatException("JSON value converted to Copycat must be in envelope containing schema"); + + return convertToCopycat(value.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME), value.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)); + } + + + private static JsonNode asJsonSchema(Schema schema) { + switch (schema.getType()) { + case BOOLEAN: + return JsonSchema.BOOLEAN_SCHEMA; + case BYTES: + return JsonSchema.BYTES_SCHEMA; + case DOUBLE: + return JsonSchema.DOUBLE_SCHEMA; + case FLOAT: + return JsonSchema.FLOAT_SCHEMA; + case INT: + return JsonSchema.INT_SCHEMA; + case LONG: + return JsonSchema.LONG_SCHEMA; + case NULL: + throw new UnsupportedOperationException("null schema not supported"); + case STRING: + return JsonSchema.STRING_SCHEMA; + case UNION: { + throw new UnsupportedOperationException("union schema not supported"); + } + case ARRAY: + return JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.ARRAY_TYPE_NAME) + .set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, asJsonSchema(schema.getElementType())); + case ENUM: + throw new UnsupportedOperationException("enum schema not supported"); + case MAP: + throw new UnsupportedOperationException("map schema not supported"); + default: + throw new CopycatException("Couldn't translate unsupported schema type " + schema.getType().getName() + "."); + } + } + + + private static Schema asCopycatSchema(JsonNode jsonSchema) { + if (jsonSchema.isNull()) + return null; + + JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME); + if (schemaTypeNode == null || !schemaTypeNode.isTextual()) + throw new CopycatException("Schema must contain 'type' field"); + + switch (schemaTypeNode.textValue()) { + case JsonSchema.BOOLEAN_TYPE_NAME: + return SchemaBuilder.builder().booleanType(); + case JsonSchema.INT_TYPE_NAME: + return SchemaBuilder.builder().intType(); + case JsonSchema.LONG_TYPE_NAME: + return SchemaBuilder.builder().longType(); + case JsonSchema.FLOAT_TYPE_NAME: + return SchemaBuilder.builder().floatType(); + case JsonSchema.DOUBLE_TYPE_NAME: + return SchemaBuilder.builder().doubleType(); + case JsonSchema.BYTES_TYPE_NAME: + return SchemaBuilder.builder().bytesType(); + case JsonSchema.STRING_TYPE_NAME: + return SchemaBuilder.builder().stringType(); + case JsonSchema.ARRAY_TYPE_NAME: + JsonNode elemSchema = jsonSchema.get(JsonSchema.ARRAY_ITEMS_FIELD_NAME); + if (elemSchema == null) + throw new CopycatException("Array schema did not specify the element type"); + return Schema.createArray(asCopycatSchema(elemSchema)); + default: + throw new CopycatException("Unknown schema type: " + schemaTypeNode.textValue()); + } + } + + + /** + * Convert this object, in org.apache.kafka.copycat.data format, into a JSON object with an envelope object + * containing schema and payload fields. + * @param value + * @return + */ + private static JsonNode convertToJsonWithSchemaEnvelope(Object value) { + return convertToJson(value).toJsonNode(); + } + + /** + * Convert this object, in the org.apache.kafka.copycat.data format, into a JSON object, returning both the schema + * and the converted object. + */ + private static JsonSchema.Envelope convertToJson(Object value) { + if (value == null) { + return JsonSchema.nullEnvelope(); + } else if (value instanceof Boolean) { + return JsonSchema.booleanEnvelope((Boolean) value); + } else if (value instanceof Byte) { + return JsonSchema.intEnvelope((Byte) value); + } else if (value instanceof Short) { + return JsonSchema.intEnvelope((Short) value); + } else if (value instanceof Integer) { + return JsonSchema.intEnvelope((Integer) value); + } else if (value instanceof Long) { + return JsonSchema.longEnvelope((Long) value); + } else if (value instanceof Float) { + return JsonSchema.floatEnvelope((Float) value); + } else if (value instanceof Double) { + return JsonSchema.doubleEnvelope((Double) value); + } else if (value instanceof byte[]) { + return JsonSchema.bytesEnvelope((byte[]) value); + } else if (value instanceof ByteBuffer) { + return JsonSchema.bytesEnvelope(((ByteBuffer) value).array()); + } else if (value instanceof CharSequence) { + return JsonSchema.stringEnvelope(value.toString()); + } else if (value instanceof Collection) { + Collection collection = (Collection) value; + ObjectNode schema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.ARRAY_TYPE_NAME); + ArrayNode list = JsonNodeFactory.instance.arrayNode(); + JsonNode itemSchema = null; + for (Object elem : collection) { + JsonSchema.Envelope fieldSchemaAndValue = convertToJson(elem); + if (itemSchema == null) { + itemSchema = fieldSchemaAndValue.schema; + schema.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, itemSchema); + } else { + if (!itemSchema.equals(fieldSchemaAndValue.schema)) + throw new CopycatException("Mismatching schemas found in a list."); + } + + list.add(fieldSchemaAndValue.payload); + } + return new JsonSchema.Envelope(schema, list); + } + + throw new CopycatException("Couldn't convert " + value + " to Avro."); + } + + + private static Object convertToCopycat(JsonNode jsonSchema, JsonNode jsonValue) { + if (jsonSchema.isNull()) + return null; + + JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME); + if (schemaTypeNode == null || !schemaTypeNode.isTextual()) + throw new CopycatException("Schema must contain 'type' field. Schema: " + jsonSchema.toString()); + + JsonToCopycatTypeConverter typeConverter = TO_COPYCAT_CONVERTERS.get(schemaTypeNode.textValue()); + if (typeConverter != null) + return typeConverter.convert(jsonSchema, jsonValue); + + throw new CopycatException("Unknown schema type: " + schemaTypeNode); + } + + + private interface JsonToCopycatTypeConverter { + Object convert(JsonNode schema, JsonNode value); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java ---------------------------------------------------------------------- diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java new file mode 100644 index 0000000..29c7bac --- /dev/null +++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.copycat.json; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; + +/** + * JSON deserializer for Jackson's JsonNode tree model. Using the tree model allows it to work with arbitrarily + * structured data without having associated Java classes. This deserializer also supports Copycat schemas. + */ +public class JsonDeserializer implements Deserializer<JsonNode> { + private static final ObjectNode CATCH_ALL_OBJECT_SCHEMA = JsonNodeFactory.instance.objectNode(); + private static final ObjectNode CATCH_ALL_ARRAY_SCHEMA = JsonNodeFactory.instance.objectNode(); + private static final ArrayNode ALL_SCHEMAS_LIST = JsonNodeFactory.instance.arrayNode(); + private static final ObjectNode CATCH_ALL_SCHEMA = JsonNodeFactory.instance.objectNode(); + static { + CATCH_ALL_OBJECT_SCHEMA.put("type", "object") + .putArray("field").add(JsonNodeFactory.instance.objectNode().put("*", "all")); + + CATCH_ALL_ARRAY_SCHEMA.put("type", "array").put("items", "all"); + + ALL_SCHEMAS_LIST.add("boolean").add("int").add("long").add("float").add("double").add("bytes").add("string") + .add(CATCH_ALL_ARRAY_SCHEMA).add(CATCH_ALL_OBJECT_SCHEMA); + + CATCH_ALL_SCHEMA.put("name", "all").set("type", ALL_SCHEMAS_LIST); + } + + private ObjectMapper objectMapper = new ObjectMapper(); + + /** + * Default constructor needed by Kafka + */ + public JsonDeserializer() { + } + + @Override + public void configure(Map<String, ?> props, boolean isKey) { + } + + @Override + public JsonNode deserialize(String topic, byte[] bytes) { + JsonNode data; + try { + data = objectMapper.readTree(bytes); + } catch (Exception e) { + throw new SerializationException(e); + } + + // The deserialized data should either be an envelope object containing the schema and the payload or the schema + // was stripped during serialization and we need to fill in an all-encompassing schema. + if (!data.isObject() || data.size() != 2 || !data.has("schema") || !data.has("payload")) { + ObjectNode envelope = JsonNodeFactory.instance.objectNode(); + envelope.set("schema", CATCH_ALL_SCHEMA); + envelope.set("payload", data); + data = envelope; + } + + return data; + } + + @Override + public void close() { + + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java ---------------------------------------------------------------------- diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java new file mode 100644 index 0000000..a807e0f --- /dev/null +++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.json; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import java.nio.ByteBuffer; + +public class JsonSchema { + + static final String ENVELOPE_SCHEMA_FIELD_NAME = "schema"; + static final String ENVELOPE_PAYLOAD_FIELD_NAME = "payload"; + static final String SCHEMA_TYPE_FIELD_NAME = "type"; + static final String SCHEMA_NAME_FIELD_NAME = "name"; + static final String ARRAY_ITEMS_FIELD_NAME = "items"; + static final String BOOLEAN_TYPE_NAME = "boolean"; + static final JsonNode BOOLEAN_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, BOOLEAN_TYPE_NAME); + static final String INT_TYPE_NAME = "int"; + static final JsonNode INT_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, INT_TYPE_NAME); + static final String LONG_TYPE_NAME = "long"; + static final JsonNode LONG_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, LONG_TYPE_NAME); + static final String FLOAT_TYPE_NAME = "float"; + static final JsonNode FLOAT_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, FLOAT_TYPE_NAME); + static final String DOUBLE_TYPE_NAME = "double"; + static final JsonNode DOUBLE_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, DOUBLE_TYPE_NAME); + static final String BYTES_TYPE_NAME = "bytes"; + static final JsonNode BYTES_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, BYTES_TYPE_NAME); + static final String STRING_TYPE_NAME = "string"; + static final JsonNode STRING_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, STRING_TYPE_NAME); + static final String ARRAY_TYPE_NAME = "array"; + + public static ObjectNode envelope(JsonNode schema, JsonNode payload) { + ObjectNode result = JsonNodeFactory.instance.objectNode(); + result.set(ENVELOPE_SCHEMA_FIELD_NAME, schema); + result.set(ENVELOPE_PAYLOAD_FIELD_NAME, payload); + return result; + } + + static class Envelope { + public JsonNode schema; + public JsonNode payload; + + public Envelope(JsonNode schema, JsonNode payload) { + this.schema = schema; + this.payload = payload; + } + + public ObjectNode toJsonNode() { + return envelope(schema, payload); + } + } + + + public static Envelope nullEnvelope() { + return new Envelope(null, null); + } + + public static Envelope booleanEnvelope(boolean value) { + return new Envelope(JsonSchema.BOOLEAN_SCHEMA, JsonNodeFactory.instance.booleanNode(value)); + } + + public static Envelope intEnvelope(byte value) { + return new Envelope(JsonSchema.INT_SCHEMA, JsonNodeFactory.instance.numberNode(value)); + } + + public static Envelope intEnvelope(short value) { + return new Envelope(JsonSchema.INT_SCHEMA, JsonNodeFactory.instance.numberNode(value)); + } + + public static Envelope intEnvelope(int value) { + return new Envelope(JsonSchema.INT_SCHEMA, JsonNodeFactory.instance.numberNode(value)); + } + + public static Envelope longEnvelope(long value) { + return new Envelope(JsonSchema.LONG_SCHEMA, JsonNodeFactory.instance.numberNode(value)); + } + + public static Envelope floatEnvelope(float value) { + return new Envelope(JsonSchema.FLOAT_SCHEMA, JsonNodeFactory.instance.numberNode(value)); + } + + public static Envelope doubleEnvelope(double value) { + return new Envelope(JsonSchema.DOUBLE_SCHEMA, JsonNodeFactory.instance.numberNode(value)); + } + + public static Envelope bytesEnvelope(byte[] value) { + return new Envelope(JsonSchema.BYTES_SCHEMA, JsonNodeFactory.instance.binaryNode(value)); + } + + public static Envelope bytesEnvelope(ByteBuffer value) { + return new Envelope(JsonSchema.BYTES_SCHEMA, JsonNodeFactory.instance.binaryNode(value.array())); + } + + public static Envelope stringEnvelope(CharSequence value) { + return new Envelope(JsonSchema.STRING_SCHEMA, JsonNodeFactory.instance.textNode(value.toString())); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java ---------------------------------------------------------------------- diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java new file mode 100644 index 0000000..dcac270 --- /dev/null +++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.copycat.json; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Map; + +/** + * Serialize Jackson JsonNode tree model objects to UTF-8 JSON. Using the tree model allows handling arbitrarily + * structured data without corresponding Java classes. This serializer also supports Copycat schemas. + */ +public class JsonSerializer implements Serializer<JsonNode> { + + private static final String SCHEMAS_ENABLE_CONFIG = "schemas.enable"; + private static final boolean SCHEMAS_ENABLE_DEFAULT = true; + + private final ObjectMapper objectMapper = new ObjectMapper(); + private boolean enableSchemas = SCHEMAS_ENABLE_DEFAULT; + + /** + * Default constructor needed by Kafka + */ + public JsonSerializer() { + + } + + @Override + public void configure(Map<String, ?> config, boolean isKey) { + Object enableConfigsVal = config.get(SCHEMAS_ENABLE_CONFIG); + if (enableConfigsVal != null) + enableSchemas = enableConfigsVal.toString().equals("true"); + } + + @Override + public byte[] serialize(String topic, JsonNode data) { + // This serializer works for Copycat data that requires a schema to be included, so we expect it to have a + // specific format: { "schema": {...}, "payload": ... }. + if (!data.isObject() || data.size() != 2 || !data.has("schema") || !data.has("payload")) + throw new SerializationException("JsonSerializer requires \"schema\" and \"payload\" fields and may not contain additional fields"); + + try { + if (!enableSchemas) + data = data.get("payload"); + return objectMapper.writeValueAsBytes(data); + } catch (Exception e) { + throw new SerializationException("Error serializing JSON message", e); + } + } + + @Override + public void close() { + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java ---------------------------------------------------------------------- diff --git a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java new file mode 100644 index 0000000..1a725c9 --- /dev/null +++ b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.json; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import org.junit.Test; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class JsonConverterTest { + + ObjectMapper objectMapper = new ObjectMapper(); + JsonConverter converter = new JsonConverter(); + + @Test + public void booleanToCopycat() { + assertEquals(true, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }"))); + assertEquals(false, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": false }"))); + } + + @Test + public void intToCopycat() { + assertEquals(12, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int\" }, \"payload\": 12 }"))); + } + + @Test + public void longToCopycat() { + assertEquals(12L, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"long\" }, \"payload\": 12 }"))); + assertEquals(4398046511104L, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"long\" }, \"payload\": 4398046511104 }"))); + } + + @Test + public void floatToCopycat() { + assertEquals(12.34f, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"float\" }, \"payload\": 12.34 }"))); + } + + @Test + public void doubleToCopycat() { + assertEquals(12.34, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"double\" }, \"payload\": 12.34 }"))); + } + + + @Test + public void bytesToCopycat() throws UnsupportedEncodingException { + ByteBuffer reference = ByteBuffer.wrap("test-string".getBytes("UTF-8")); + String msg = "{ \"schema\": { \"type\": \"bytes\" }, \"payload\": \"dGVzdC1zdHJpbmc=\" }"; + ByteBuffer converted = ByteBuffer.wrap((byte[]) converter.toCopycatData(parse(msg))); + assertEquals(reference, converted); + } + + @Test + public void stringToCopycat() { + assertEquals("foo-bar-baz", converter.toCopycatData(parse("{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }"))); + } + + @Test + public void arrayToCopycat() { + JsonNode arrayJson = parse("{ \"schema\": { \"type\": \"array\", \"items\": { \"type\" : \"int\" } }, \"payload\": [1, 2, 3] }"); + assertEquals(Arrays.asList(1, 2, 3), converter.toCopycatData(arrayJson)); + } + + + @Test + public void booleanToJson() { + JsonNode converted = converter.fromCopycatData(true); + validateEnvelope(converted); + assertEquals(parse("{ \"type\": \"boolean\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); + assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue()); + } + + @Test + public void intToJson() { + JsonNode converted = converter.fromCopycatData(12); + validateEnvelope(converted); + assertEquals(parse("{ \"type\": \"int\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); + assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue()); + } + + @Test + public void longToJson() { + JsonNode converted = converter.fromCopycatData(4398046511104L); + validateEnvelope(converted); + assertEquals(parse("{ \"type\": \"long\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); + assertEquals(4398046511104L, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).longValue()); + } + + @Test + public void floatToJson() { + JsonNode converted = converter.fromCopycatData(12.34f); + validateEnvelope(converted); + assertEquals(parse("{ \"type\": \"float\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); + assertEquals(12.34f, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).floatValue(), 0.001); + } + + @Test + public void doubleToJson() { + JsonNode converted = converter.fromCopycatData(12.34); + validateEnvelope(converted); + assertEquals(parse("{ \"type\": \"double\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); + assertEquals(12.34, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).doubleValue(), 0.001); + } + + @Test + public void bytesToJson() throws IOException { + JsonNode converted = converter.fromCopycatData("test-string".getBytes()); + validateEnvelope(converted); + assertEquals(parse("{ \"type\": \"bytes\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); + assertEquals(ByteBuffer.wrap("test-string".getBytes()), + ByteBuffer.wrap(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).binaryValue())); + } + + @Test + public void stringToJson() { + JsonNode converted = converter.fromCopycatData("test-string"); + validateEnvelope(converted); + assertEquals(parse("{ \"type\": \"string\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); + assertEquals("test-string", converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).textValue()); + } + + @Test + public void arrayToJson() { + JsonNode converted = converter.fromCopycatData(Arrays.asList(1, 2, 3)); + validateEnvelope(converted); + assertEquals(parse("{ \"type\": \"array\", \"items\": { \"type\": \"int\" } }"), + converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); + assertEquals(JsonNodeFactory.instance.arrayNode().add(1).add(2).add(3), + converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)); + } + + + private JsonNode parse(String json) { + try { + return objectMapper.readTree(json); + } catch (IOException e) { + fail("IOException during JSON parse: " + e.getMessage()); + throw new RuntimeException("failed"); + } + } + + private void validateEnvelope(JsonNode env) { + assertNotNull(env); + assertTrue(env.isObject()); + assertEquals(2, env.size()); + assertTrue(env.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); + assertTrue(env.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isObject()); + assertTrue(env.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java new file mode 100644 index 0000000..130a529 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.cli; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.copycat.runtime.Herder; +import org.apache.kafka.copycat.runtime.Worker; +import org.apache.kafka.copycat.runtime.standalone.StandaloneHerder; +import org.apache.kafka.copycat.util.Callback; +import org.apache.kafka.copycat.util.FutureCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Properties; + +/** + * <p> + * Command line utility that runs Copycat as a standalone process. In this mode, work is not + * distributed. Instead, all the normal Copycat machinery works within a single process. This is + * useful for ad hoc, small, or experimental jobs. + * </p> + * <p> + * By default, no job configs or offset data is persistent. You can make jobs persistent and + * fault tolerant by overriding the settings to use file storage for both. + * </p> + */ +@InterfaceStability.Unstable +public class CopycatStandalone { + private static final Logger log = LoggerFactory.getLogger(CopycatStandalone.class); + + public static void main(String[] args) throws Exception { + Properties workerProps; + Properties connectorProps; + + if (args.length < 2) { + log.info("Usage: CopycatStandalone worker.properties connector1.properties [connector2.properties ...]"); + System.exit(1); + } + + String workerPropsFile = args[0]; + workerProps = !workerPropsFile.isEmpty() ? Utils.loadProps(workerPropsFile) : new Properties(); + + WorkerConfig workerConfig = new WorkerConfig(workerProps); + Worker worker = new Worker(workerConfig); + Herder herder = new StandaloneHerder(worker); + final org.apache.kafka.copycat.runtime.Copycat copycat = new org.apache.kafka.copycat.runtime.Copycat(worker, herder); + copycat.start(); + + try { + for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length)) { + connectorProps = Utils.loadProps(connectorPropsFile); + FutureCallback<String> cb = new FutureCallback<>(new Callback<String>() { + @Override + public void onCompletion(Throwable error, String id) { + if (error != null) + log.error("Failed to create job for {}", connectorPropsFile); + } + }); + herder.addConnector(connectorProps, cb); + cb.get(); + } + } catch (Throwable t) { + log.error("Stopping after connector error", t); + copycat.stop(); + } + + // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request + copycat.awaitStop(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java new file mode 100644 index 0000000..46229db --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.cli; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; + +import java.util.Properties; +import java.util.Set; + +/** + * Configuration for standalone workers. + */ +@InterfaceStability.Unstable +public class WorkerConfig extends AbstractConfig { + + public static final String CLUSTER_CONFIG = "cluster"; + private static final String + CLUSTER_CONFIG_DOC = + "ID for this cluster, which is used to provide a namespace so multiple Copycat clusters " + + "or instances may co-exist while sharing a single Kafka cluster."; + public static final String CLUSTER_DEFAULT = "copycat"; + + public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; + public static final String BOOTSTRAP_SERVERS_DOC + = "A list of host/port pairs to use for establishing the initial connection to the Kafka " + + "cluster. The client will make use of all servers irrespective of which servers are " + + "specified here for bootstrapping—this list only impacts the initial hosts used " + + "to discover the full set of servers. This list should be in the form " + + "<code>host1:port1,host2:port2,...</code>. Since these servers are just used for the " + + "initial connection to discover the full cluster membership (which may change " + + "dynamically), this list need not contain the full set of servers (you may want more " + + "than one, though, in case a server is down)."; + public static final String BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; + + public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter"; + public static final String KEY_CONVERTER_CLASS_DOC = + "Converter class for key Copycat data that implements the <code>Converter</code> interface."; + + public static final String VALUE_CONVERTER_CLASS_CONFIG = "value.converter"; + public static final String VALUE_CONVERTER_CLASS_DOC = + "Converter class for value Copycat data that implements the <code>Converter</code> interface."; + + public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer"; + public static final String KEY_SERIALIZER_CLASS_DOC = + "Serializer class for key that implements the <code>Serializer</code> interface."; + + public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"; + public static final String VALUE_SERIALIZER_CLASS_DOC = + "Serializer class for value that implements the <code>Serializer</code> interface."; + + public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer"; + public static final String KEY_DESERIALIZER_CLASS_DOC = + "Serializer class for key that implements the <code>Deserializer</code> interface."; + + public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer"; + public static final String VALUE_DESERIALIZER_CLASS_DOC = + "Deserializer class for value that implements the <code>Deserializer</code> interface."; + + public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG + = "task.shutdown.graceful.timeout.ms"; + private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC = + "Amount of time to wait for tasks to shutdown gracefully. This is the total amount of time," + + " not per task. All task have shutdown triggered, then they are waited on sequentially."; + private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT = "5000"; + + public static final String OFFSET_COMMIT_INTERVAL_MS_CONFIG = "offset.flush.interval.ms"; + private static final String OFFSET_COMMIT_INTERVAL_MS_DOC + = "Interval at which to try committing offsets for tasks."; + public static final long OFFSET_COMMIT_INTERVAL_MS_DEFAULT = 60000L; + + public static final String OFFSET_COMMIT_TIMEOUT_MS_CONFIG = "offset.flush.timeout.ms"; + private static final String OFFSET_COMMIT_TIMEOUT_MS_DOC + = "Maximum number of milliseconds to wait for records to flush and partition offset data to be" + + " committed to offset storage before cancelling the process and restoring the offset " + + "data to be committed in a future attempt."; + public static final long OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000L; + + private static ConfigDef config; + + static { + config = new ConfigDef() + .define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC) + .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT, + Importance.HIGH, BOOTSTRAP_SERVERS_DOC) + .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, + Importance.HIGH, KEY_CONVERTER_CLASS_DOC) + .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, + Importance.HIGH, VALUE_CONVERTER_CLASS_DOC) + .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, + Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) + .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, + Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC) + .define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, + Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC) + .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, + Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC) + .define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG, + TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW, + TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC) + .define(OFFSET_COMMIT_INTERVAL_MS_CONFIG, Type.LONG, OFFSET_COMMIT_INTERVAL_MS_DEFAULT, + Importance.LOW, OFFSET_COMMIT_INTERVAL_MS_DOC) + .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, Type.LONG, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, + Importance.LOW, OFFSET_COMMIT_TIMEOUT_MS_DOC); + } + + public WorkerConfig() { + this(new Properties()); + } + + public WorkerConfig(Properties props) { + super(config, props); + } + + public Properties getUnusedProperties() { + Set<String> unusedKeys = this.unused(); + Properties unusedProps = new Properties(); + for (String key : unusedKeys) { + unusedProps.put(key, this.originals().get(key)); + } + return unusedProps; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java new file mode 100644 index 0000000..e3fcc1c --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; + +import java.util.Properties; +import java.util.Set; + +/** + * <p> + * Configuration options for Connectors. These only include Copycat system-level configuration + * options (e.g. Connector class name, timeouts used by Copycat to control the connector) but does + * not include Connector-specific options (e.g. database connection settings). + * </p> + * <p> + * Note that some of these options are not required for all connectors. For example TOPICS_CONFIG + * is sink-specific. + * </p> + */ +public class ConnectorConfig extends AbstractConfig { + + public static final String NAME_CONFIG = "name"; + private static final String NAME_DOC = "Globally unique name to use for this connector."; + + public static final String CONNECTOR_CLASS_CONFIG = "connector.class"; + private static final String CONNECTOR_CLASS_DOC = + "Name of the class for this connector. Must be a subclass of org.apache.kafka.copycat.connector" + + ".Connector"; + + public static final String TASKS_MAX_CONFIG = "tasks.max"; + private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector."; + public static final int TASKS_MAX_DEFAULT = 1; + + public static final String TOPICS_CONFIG = "topics"; + private static final String TOPICS_DOC = ""; + public static final String TOPICS_DEFAULT = ""; + + private static ConfigDef config; + + static { + config = new ConfigDef() + .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC) + .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC) + .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, Importance.HIGH, TASKS_MAX_DOC) + .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC); + } + + private Properties originalProperties; + + public ConnectorConfig() { + this(new Properties()); + } + + public ConnectorConfig(Properties props) { + super(config, props); + this.originalProperties = props; + } + + public Properties getUnusedProperties() { + Set<String> unusedKeys = this.unused(); + Properties unusedProps = new Properties(); + for (String key : unusedKeys) { + unusedProps.setProperty(key, originalProperties.getProperty(key)); + } + return unusedProps; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Copycat.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Copycat.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Copycat.java new file mode 100644 index 0000000..e8dfe14 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Copycat.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This class ties together all the components of a Copycat process (herder, worker, + * storage, command interface), managing their lifecycle. + */ +@InterfaceStability.Unstable +public class Copycat { + private static final Logger log = LoggerFactory.getLogger(Copycat.class); + + private final Worker worker; + private final Herder herder; + private final CountDownLatch startLatch = new CountDownLatch(1); + private final CountDownLatch stopLatch = new CountDownLatch(1); + private final AtomicBoolean shutdown = new AtomicBoolean(false); + private final ShutdownHook shutdownHook; + + public Copycat(Worker worker, Herder herder) { + log.debug("Copycat created"); + this.worker = worker; + this.herder = herder; + shutdownHook = new ShutdownHook(); + } + + public void start() { + log.info("Copycat starting"); + Runtime.getRuntime().addShutdownHook(shutdownHook); + + worker.start(); + herder.start(); + + log.info("Copycat started"); + + startLatch.countDown(); + } + + public void stop() { + boolean wasShuttingDown = shutdown.getAndSet(true); + if (!wasShuttingDown) { + log.info("Copycat stopping"); + + herder.stop(); + worker.stop(); + + log.info("Copycat stopped"); + } + + stopLatch.countDown(); + } + + public void awaitStop() { + try { + stopLatch.await(); + } catch (InterruptedException e) { + log.error("Interrupted waiting for Copycat to shutdown"); + } + } + + private class ShutdownHook extends Thread { + @Override + public void run() { + try { + startLatch.await(); + Copycat.this.stop(); + } catch (InterruptedException e) { + log.error("Interrupted in shutdown hook while waiting for copycat startup to finish"); + } + } + } +}