Added (insert-only) UpdateableDataContext capabilities Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/0e7545e7 Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/0e7545e7 Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/0e7545e7
Branch: refs/heads/master Commit: 0e7545e74d80985c2b90fff6288b90f8e509ea40 Parents: 9f4d15e Author: Kasper Sørensen <[email protected]> Authored: Sun Jan 28 15:50:04 2018 -0800 Committer: Kasper Sørensen <[email protected]> Committed: Sun Jan 28 15:50:38 2018 -0800 ---------------------------------------------------------------------- .../kafka/ConsumerAndProducerFactory.java | 35 ++++ .../apache/metamodel/kafka/ConsumerFactory.java | 32 ---- .../kafka/KafkaConsumerAndProducerFactory.java | 162 +++++++++++++++++++ .../metamodel/kafka/KafkaConsumerFactory.java | 104 ------------ .../metamodel/kafka/KafkaDataContext.java | 34 +++- .../metamodel/kafka/KafkaInsertBuilder.java | 54 +++++++ .../metamodel/kafka/KafkaUpdateCallback.java | 85 ++++++++++ .../metamodel/kafka/KafkaDataContextTest.java | 6 +- 8 files changed, 365 insertions(+), 147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerAndProducerFactory.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerAndProducerFactory.java b/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerAndProducerFactory.java new file mode 100644 index 0000000..b3c58ea --- /dev/null +++ b/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerAndProducerFactory.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.kafka; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; + +/** + * Factory interface for Kafka {@link Consumer} and {@link Producer} objects to + * be used by Apache MetaModel. Since Apache MetaModel may potentially serve + * multiple queries at the same times, multiple consumers may be needed. This + * class determines how these are instantiated. + */ +public interface ConsumerAndProducerFactory { + + public <K, V> Consumer<K, V> createConsumer(String topic, Class<K> keyClass, Class<V> valueClass); + + public <K, V> Producer<K, V> createProducer(Class<K> keyClass, Class<V> valueClass); +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerFactory.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerFactory.java b/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerFactory.java deleted file mode 100644 index efabf38..0000000 --- a/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.kafka; - -import org.apache.kafka.clients.consumer.Consumer; - -/** - * Factory interface for Kafka {@link Consumer} objects to be used by Apache - * MetaModel. Since Apache MetaModel may potentially serve multiple queries at - * the same times, multiple consumers may be needed. This class determines how - * these are instantiated. - */ -public interface ConsumerFactory { - - public <K, V> Consumer<K, V> createConsumer(String topic, Class<K> keyClass, Class<V> valueClass); -} http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerAndProducerFactory.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerAndProducerFactory.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerAndProducerFactory.java new file mode 100644 index 0000000..c0d4b8f --- /dev/null +++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerAndProducerFactory.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.kafka; + +import java.nio.ByteBuffer; +import java.util.Properties; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.ByteBufferDeserializer; +import org.apache.kafka.common.serialization.ByteBufferSerializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.BytesSerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.DoubleDeserializer; +import org.apache.kafka.common.serialization.DoubleSerializer; +import org.apache.kafka.common.serialization.FloatDeserializer; +import org.apache.kafka.common.serialization.FloatSerializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.ShortDeserializer; +import org.apache.kafka.common.serialization.ShortSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; + +/** + * Default {@link ConsumerAndProducerFactory} implementation. + */ +public class KafkaConsumerAndProducerFactory implements ConsumerAndProducerFactory { + + private final Properties baseProperties; + + public KafkaConsumerAndProducerFactory(String bootstrapServers) { + this.baseProperties = new Properties(); + this.baseProperties.setProperty("bootstrap.servers", bootstrapServers); + } + + public KafkaConsumerAndProducerFactory(Properties baseProperties) { + this.baseProperties = baseProperties; + } + + @Override + public <K, V> Producer<K, V> createProducer(Class<K> keyClass, Class<V> valueClass) { + final Properties properties = new Properties(); + baseProperties.stringPropertyNames().forEach(k -> { + properties.setProperty(k, baseProperties.getProperty(k)); + }); + + properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serializerForClass(keyClass).getName()); + properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializerForClass(keyClass).getName()); + + return new KafkaProducer<>(baseProperties); + } + + @Override + public <K, V> Consumer<K, V> createConsumer(String topic, Class<K> keyClass, Class<V> valueClass) { + final String groupId = "apache_metamodel_" + topic + "_" + System.currentTimeMillis(); + + final Properties properties = new Properties(); + baseProperties.stringPropertyNames().forEach(k -> { + properties.setProperty(k, baseProperties.getProperty(k)); + }); + + properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializerForClass(keyClass).getName()); + properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializerForClass(keyClass) + .getName()); + return new KafkaConsumer<>(properties); + } + + private static Class<? extends Serializer<?>> serializerForClass(Class<?> cls) { + if (cls == String.class || cls == CharSequence.class) { + return StringSerializer.class; + } + if (cls == Double.class) { + return DoubleSerializer.class; + } + if (cls == Integer.class) { + return IntegerSerializer.class; + } + if (cls == Float.class) { + return FloatSerializer.class; + } + if (cls == Long.class) { + return LongSerializer.class; + } + if (cls == Short.class) { + return ShortSerializer.class; + } + if (cls == Bytes.class) { + return BytesSerializer.class; + } + if (cls == ByteBuffer.class) { + return ByteBufferSerializer.class; + } + if (cls == byte[].class || cls == Byte[].class || cls == Object.class) { + return ByteArraySerializer.class; + } + // fall back to doing nothing (byte[]) + return ByteArraySerializer.class; + } + + private static Class<? extends Deserializer<?>> deserializerForClass(Class<?> cls) { + if (cls == String.class || cls == CharSequence.class) { + return StringDeserializer.class; + } + if (cls == Double.class) { + return DoubleDeserializer.class; + } + if (cls == Integer.class) { + return IntegerDeserializer.class; + } + if (cls == Float.class) { + return FloatDeserializer.class; + } + if (cls == Long.class) { + return LongDeserializer.class; + } + if (cls == Short.class) { + return ShortDeserializer.class; + } + if (cls == Bytes.class) { + return BytesDeserializer.class; + } + if (cls == ByteBuffer.class) { + return ByteBufferDeserializer.class; + } + if (cls == byte[].class || cls == Byte[].class || cls == Object.class) { + return ByteArrayDeserializer.class; + } + // fall back to doing nothing (byte[]) + return ByteArrayDeserializer.class; + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java deleted file mode 100644 index 15e5f70..0000000 --- a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.metamodel.kafka; - -import java.nio.ByteBuffer; -import java.util.Properties; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.ByteBufferDeserializer; -import org.apache.kafka.common.serialization.BytesDeserializer; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.DoubleDeserializer; -import org.apache.kafka.common.serialization.FloatDeserializer; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.ShortDeserializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.utils.Bytes; - -/** - * Default {@link ConsumerFactory} implementation. - */ -public class KafkaConsumerFactory implements ConsumerFactory { - - private final Properties baseProperties; - - public KafkaConsumerFactory(String bootstrapServers) { - this.baseProperties = new Properties(); - this.baseProperties.setProperty("bootstrap.servers", bootstrapServers); - } - - public KafkaConsumerFactory(Properties baseProperties) { - this.baseProperties = baseProperties; - } - - @Override - public <K, V> Consumer<K, V> createConsumer(String topic, Class<K> keyClass, Class<V> valueClass) { - final String groupId = "apache_metamodel_" + topic + "_" + System.currentTimeMillis(); - - final Properties properties = new Properties(); - baseProperties.stringPropertyNames().forEach(k -> { - properties.setProperty(k, baseProperties.getProperty(k)); - }); - - properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); - properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializerForClass(keyClass).getName()); - properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializerForClass(keyClass) - .getName()); - return new KafkaConsumer<>(properties); - } - - private static Class<? extends Deserializer<?>> deserializerForClass(Class<?> cls) { - if (cls == String.class || cls == CharSequence.class) { - return StringDeserializer.class; - } - if (cls == Double.class) { - return DoubleDeserializer.class; - } - if (cls == Integer.class) { - return IntegerDeserializer.class; - } - if (cls == Float.class) { - return FloatDeserializer.class; - } - if (cls == Long.class) { - return LongDeserializer.class; - } - if (cls == Short.class) { - return ShortDeserializer.class; - } - if (cls == Bytes.class) { - return BytesDeserializer.class; - } - if (cls == ByteBuffer.class) { - return ByteBufferDeserializer.class; - } - if (cls == byte[].class || cls == Byte[].class || cls == Object.class) { - return ByteArrayDeserializer.class; - } - // fall back to doing nothing - return ByteArrayDeserializer.class; - } - -} http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java index 2fa8c06..8f4d7a7 100644 --- a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java +++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java @@ -28,10 +28,14 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.metamodel.MetaModelException; import org.apache.metamodel.QueryPostprocessDataContext; +import org.apache.metamodel.UpdateScript; +import org.apache.metamodel.UpdateSummary; +import org.apache.metamodel.UpdateableDataContext; import org.apache.metamodel.data.DataSet; import org.apache.metamodel.data.FirstRowDataSet; import org.apache.metamodel.data.MaxRowsDataSet; @@ -47,7 +51,7 @@ import org.apache.metamodel.schema.MutableTable; import org.apache.metamodel.schema.Schema; import org.apache.metamodel.schema.Table; -public class KafkaDataContext<K, V> extends QueryPostprocessDataContext { +public class KafkaDataContext<K, V> extends QueryPostprocessDataContext implements UpdateableDataContext { public static final String SYSTEM_PROPERTY_CONSUMER_POLL_TIMEOUT = "metamodel.kafka.consumer.poll.timeout"; @@ -64,19 +68,19 @@ public class KafkaDataContext<K, V> extends QueryPostprocessDataContext { private final Class<K> keyClass; private final Class<V> valueClass; - private final ConsumerFactory consumerFactory; + private final ConsumerAndProducerFactory consumerAndProducerFactory; private final Supplier<Collection<String>> topicSupplier; public KafkaDataContext(Class<K> keyClass, Class<V> valueClass, String bootstrapServers, Collection<String> topics) { - this(keyClass, valueClass, new KafkaConsumerFactory(bootstrapServers), () -> topics); + this(keyClass, valueClass, new KafkaConsumerAndProducerFactory(bootstrapServers), () -> topics); } - public KafkaDataContext(Class<K> keyClass, Class<V> valueClass, ConsumerFactory consumerFactory, - Supplier<Collection<String>> topicSupplier) { + public KafkaDataContext(Class<K> keyClass, Class<V> valueClass, + ConsumerAndProducerFactory consumerAndProducerFactory, Supplier<Collection<String>> topicSupplier) { this.keyClass = keyClass; this.valueClass = valueClass; - this.consumerFactory = consumerFactory; + this.consumerAndProducerFactory = consumerAndProducerFactory; this.topicSupplier = topicSupplier; } @@ -106,7 +110,7 @@ public class KafkaDataContext<K, V> extends QueryPostprocessDataContext { @Override protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) { final String topic = table.getName(); - final Consumer<K, V> consumer = consumerFactory.createConsumer(topic, keyClass, valueClass); + final Consumer<K, V> consumer = consumerAndProducerFactory.createConsumer(topic, keyClass, valueClass); final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); final List<TopicPartition> partitions = partitionInfos.stream().map(partitionInfo -> { return new TopicPartition(topic, partitionInfo.partition()); @@ -171,7 +175,7 @@ public class KafkaDataContext<K, V> extends QueryPostprocessDataContext { } final String topic = table.getName(); - final Consumer<K, V> consumer = consumerFactory.createConsumer(topic, keyClass, valueClass); + final Consumer<K, V> consumer = consumerAndProducerFactory.createConsumer(topic, keyClass, valueClass); // handle partition filtering final List<TopicPartition> assignedPartitions; @@ -258,4 +262,18 @@ public class KafkaDataContext<K, V> extends QueryPostprocessDataContext { return false; } } + + @Override + public UpdateSummary executeUpdate(UpdateScript update) { + final Producer<K, V> producer = consumerAndProducerFactory.createProducer(keyClass, valueClass); + final KafkaUpdateCallback<K, V> callback = new KafkaUpdateCallback<>(this, producer); + try { + update.run(callback); + } finally { + callback.flush(); + } + final UpdateSummary updateSummary = callback.getUpdateSummary(); + callback.close(); + return updateSummary; + } } http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/main/java/org/apache/metamodel/kafka/KafkaInsertBuilder.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaInsertBuilder.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaInsertBuilder.java new file mode 100644 index 0000000..4253bce --- /dev/null +++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaInsertBuilder.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.kafka; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.insert.AbstractRowInsertionBuilder; +import org.apache.metamodel.schema.Column; +import org.apache.metamodel.schema.Table; + +final class KafkaInsertBuilder<K, V> extends AbstractRowInsertionBuilder<KafkaUpdateCallback<K, V>> { + + public KafkaInsertBuilder(KafkaUpdateCallback<K, V> updateCallback, Table table) { + super(updateCallback, table); + } + + @SuppressWarnings("unchecked") + @Override + public void execute() throws MetaModelException { + final Column[] columns = getColumns(); + final Object[] values = getValues(); + + K key = null; + V value = null; + + for (int i = 0; i < columns.length; i++) { + if (columns[i].getName() == KafkaDataContext.COLUMN_KEY) { + key = (K) values[i]; + } + if (columns[i].getName() == KafkaDataContext.COLUMN_VALUE) { + value = (V) values[i]; + } + } + + getUpdateCallback().getProducer().send(new ProducerRecord<K, V>(getTable().getName(), key, value)); + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/main/java/org/apache/metamodel/kafka/KafkaUpdateCallback.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaUpdateCallback.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaUpdateCallback.java new file mode 100644 index 0000000..a7b6e96 --- /dev/null +++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaUpdateCallback.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.kafka; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.metamodel.AbstractUpdateCallback; +import org.apache.metamodel.create.TableCreationBuilder; +import org.apache.metamodel.delete.RowDeletionBuilder; +import org.apache.metamodel.drop.TableDropBuilder; +import org.apache.metamodel.insert.RowInsertionBuilder; +import org.apache.metamodel.schema.Schema; +import org.apache.metamodel.schema.Table; + +final class KafkaUpdateCallback<K, V> extends AbstractUpdateCallback { + + private final Producer<K, V> producer; + + public KafkaUpdateCallback(KafkaDataContext<K, V> kafkaDataContext, Producer<K, V> producer) { + super(kafkaDataContext); + this.producer = producer; + } + + @Override + public TableCreationBuilder createTable(Schema schema, String name) throws IllegalArgumentException, + IllegalStateException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDropTableSupported() { + return false; + } + + @Override + public TableDropBuilder dropTable(Table table) throws IllegalArgumentException, IllegalStateException, + UnsupportedOperationException { + throw new UnsupportedOperationException(); + } + + @Override + public RowInsertionBuilder insertInto(Table table) throws IllegalArgumentException, IllegalStateException, + UnsupportedOperationException { + return new KafkaInsertBuilder<>(this, table); + } + + @Override + public boolean isDeleteSupported() { + return false; + } + + @Override + public RowDeletionBuilder deleteFrom(Table table) throws IllegalArgumentException, IllegalStateException, + UnsupportedOperationException { + throw new UnsupportedOperationException(); + } + + public Producer<K, V> getProducer() { + return producer; + } + + public void flush() { + producer.flush(); + } + + public void close() { + producer.close(); + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java b/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java index 4b19940..491c96d 100644 --- a/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java +++ b/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java @@ -25,7 +25,7 @@ public class KafkaDataContextTest extends EasyMockSupport { @Test public void testGetSchemaInfo() { - final ConsumerFactory consumerFactory = createMock(ConsumerFactory.class); + final ConsumerAndProducerFactory consumerFactory = createMock(ConsumerAndProducerFactory.class); replayAll(); @@ -39,7 +39,7 @@ public class KafkaDataContextTest extends EasyMockSupport { @Test public void testQueryWithoutOptimization() { - final ConsumerFactory consumerFactory = createMock(ConsumerFactory.class); + final ConsumerAndProducerFactory consumerFactory = createMock(ConsumerAndProducerFactory.class); @SuppressWarnings("unchecked") final Consumer<String, String> consumer = createMock(Consumer.class); @@ -94,7 +94,7 @@ public class KafkaDataContextTest extends EasyMockSupport { @Test public void testQueryOptimizationByPartition() { - final ConsumerFactory consumerFactory = createMock(ConsumerFactory.class); + final ConsumerAndProducerFactory consumerFactory = createMock(ConsumerAndProducerFactory.class); @SuppressWarnings("unchecked") final Consumer<String, String> consumer = createMock(Consumer.class);
