http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java new file mode 100644 index 0000000..42b9682 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + + +import org.junit.Test; + + +@SuppressWarnings("serial") +public class Kafka010ProducerITCase extends KafkaProducerTestBase { + + @Test + public void testCustomPartitioning() { + runCustomPartitioningTest(); + } + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java new file mode 100644 index 0000000..f15fd45 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.streaming.connectors.kafka; + +import kafka.admin.AdminUtils; +import kafka.common.KafkaException; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.SystemTime$; +import kafka.utils.ZkUtils; +import org.I0Itec.zkclient.ZkClient; +import org.apache.commons.io.FileUtils; +import org.apache.curator.test.TestingServer; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.util.NetUtils; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.requests.MetadataResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.BindException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +import static org.apache.flink.util.NetUtils.hostAndPortToUrlString; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * An implementation of the KafkaServerProvider for Kafka 0.10 + */ +public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { + + protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class); + private File tmpZkDir; + private File tmpKafkaParent; + private List<File> tmpKafkaDirs; + private List<KafkaServer> brokers; + private TestingServer zookeeper; + private String zookeeperConnectionString; + private String brokerConnectionString = ""; + private Properties standardProps; + private Properties additionalServerProperties; + private boolean secureMode = false; + // 6 seconds is default. Seems to be too small for travis. 30 seconds + private int zkTimeout = 30000; + + public String getBrokerConnectionString() { + return brokerConnectionString; + } + + @Override + public Properties getStandardProperties() { + return standardProps; + } + + @Override + public Properties getSecureProperties() { + Properties prop = new Properties(); + if(secureMode) { + prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT"); + prop.put("security.protocol", "SASL_PLAINTEXT"); + prop.put("sasl.kerberos.service.name", "kafka"); + + //add special timeout for Travis + prop.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout)); + prop.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout)); + prop.setProperty("metadata.fetch.timeout.ms","120000"); + } + return prop; + } + + @Override + public String getVersion() { + return "0.10"; + } + + @Override + public List<KafkaServer> getBrokers() { + return brokers; + } + + @Override + public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) { + return new FlinkKafkaConsumer010<>(topics, readSchema, props); + } + + @Override + public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { + FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner); + prod.setFlushOnCheckpoint(true); + return new StreamSink<>(prod); + } + + + @Override + public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { + FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner); + prod.setFlushOnCheckpoint(true); + return stream.addSink(prod); + } + + @Override + public KafkaOffsetHandler createOffsetHandler(Properties props) { + return new KafkaOffsetHandlerImpl(props); + } + + @Override + public void restartBroker(int leaderId) throws Exception { + brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId))); + } + + @Override + public int getLeaderToShutDown(String topic) throws Exception { + ZkUtils zkUtils = getZkUtils(); + try { + MetadataResponse.PartitionMetadata firstPart = null; + do { + if (firstPart != null) { + LOG.info("Unable to find leader. error code {}", firstPart.error().code()); + // not the first try. Sleep a bit + Thread.sleep(150); + } + + List<MetadataResponse.PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionMetadata(); + firstPart = partitionMetadata.get(0); + } + while (firstPart.error().code() != 0); + + return firstPart.leader().id(); + } finally { + zkUtils.close(); + } + } + + @Override + public int getBrokerId(KafkaServer server) { + return server.config().brokerId(); + } + + @Override + public boolean isSecureRunSupported() { + return true; + } + + @Override + public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) { + //increase the timeout since in Travis ZK connection takes long time for secure connection. + if(secureMode) { + //run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout + numKafkaServers = 1; + zkTimeout = zkTimeout * 15; + } + + this.additionalServerProperties = additionalServerProperties; + this.secureMode = secureMode; + File tempDir = new File(System.getProperty("java.io.tmpdir")); + + tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); + assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs()); + + tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString())); + assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs()); + + tmpKafkaDirs = new ArrayList<>(numKafkaServers); + for (int i = 0; i < numKafkaServers; i++) { + File tmpDir = new File(tmpKafkaParent, "server-" + i); + assertTrue("cannot create kafka temp dir", tmpDir.mkdir()); + tmpKafkaDirs.add(tmpDir); + } + + zookeeper = null; + brokers = null; + + try { + zookeeper = new TestingServer(- 1, tmpZkDir); + zookeeperConnectionString = zookeeper.getConnectString(); + LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString); + + LOG.info("Starting KafkaServer"); + brokers = new ArrayList<>(numKafkaServers); + + for (int i = 0; i < numKafkaServers; i++) { + brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i))); + + if(secureMode) { + brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ","; + } else { + brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ","; + } + } + + LOG.info("ZK and KafkaServer started."); + } + catch (Throwable t) { + t.printStackTrace(); + fail("Test setup failed: " + t.getMessage()); + } + + standardProps = new Properties(); + standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); + standardProps.setProperty("bootstrap.servers", brokerConnectionString); + standardProps.setProperty("group.id", "flink-tests"); + standardProps.setProperty("enable.auto.commit", "false"); + standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout)); + standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout)); + standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.10 value) + standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) + } + + @Override + public void shutdown() { + for (KafkaServer broker : brokers) { + if (broker != null) { + broker.shutdown(); + } + } + brokers.clear(); + + if (zookeeper != null) { + try { + zookeeper.stop(); + } + catch (Exception e) { + LOG.warn("ZK.stop() failed", e); + } + zookeeper = null; + } + + // clean up the temp spaces + + if (tmpKafkaParent != null && tmpKafkaParent.exists()) { + try { + FileUtils.deleteDirectory(tmpKafkaParent); + } + catch (Exception e) { + // ignore + } + } + if (tmpZkDir != null && tmpZkDir.exists()) { + try { + FileUtils.deleteDirectory(tmpZkDir); + } + catch (Exception e) { + // ignore + } + } + } + + public ZkUtils getZkUtils() { + ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")), + Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer()); + return ZkUtils.apply(creator, false); + } + + @Override + public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) { + // create topic with one client + LOG.info("Creating topic {}", topic); + + ZkUtils zkUtils = getZkUtils(); + try { + AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig, kafka.admin.RackAwareMode.Enforced$.MODULE$); + } finally { + zkUtils.close(); + } + + // validate that the topic has been created + final long deadline = System.currentTimeMillis() + 30000; + do { + try { + if(secureMode) { + //increase wait time since in Travis ZK timeout occurs frequently + int wait = zkTimeout / 100; + LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic); + Thread.sleep(wait); + } else { + Thread.sleep(100); + } + } catch (InterruptedException e) { + // restore interrupted state + } + // we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are + // not always correct. + + // create a new ZK utils connection + ZkUtils checkZKConn = getZkUtils(); + if(AdminUtils.topicExists(checkZKConn, topic)) { + checkZKConn.close(); + return; + } + checkZKConn.close(); + } + while (System.currentTimeMillis() < deadline); + fail("Test topic could not be created"); + } + + @Override + public void deleteTestTopic(String topic) { + ZkUtils zkUtils = getZkUtils(); + try { + LOG.info("Deleting topic {}", topic); + + ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")), + Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer()); + + AdminUtils.deleteTopic(zkUtils, topic); + + zk.close(); + } finally { + zkUtils.close(); + } + } + + /** + * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed) + */ + protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception { + Properties kafkaProperties = new Properties(); + + // properties have to be Strings + kafkaProperties.put("advertised.host.name", KAFKA_HOST); + kafkaProperties.put("broker.id", Integer.toString(brokerId)); + kafkaProperties.put("log.dir", tmpFolder.toString()); + kafkaProperties.put("zookeeper.connect", zookeeperConnectionString); + kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024)); + kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024)); + + // for CI stability, increase zookeeper session timeout + kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout); + kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout); + if(additionalServerProperties != null) { + kafkaProperties.putAll(additionalServerProperties); + } + + final int numTries = 5; + + for (int i = 1; i <= numTries; i++) { + int kafkaPort = NetUtils.getAvailablePort(); + kafkaProperties.put("port", Integer.toString(kafkaPort)); + + //to support secure kafka cluster + if(secureMode) { + LOG.info("Adding Kafka secure configurations"); + kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); + kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); + kafkaProperties.putAll(getSecureProperties()); + } + + KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties); + + try { + scala.Option<String> stringNone = scala.Option.apply(null); + KafkaServer server = new KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone); + server.startup(); + return server; + } + catch (KafkaException e) { + if (e.getCause() instanceof BindException) { + // port conflict, retry... + LOG.info("Port conflict when starting Kafka Broker. Retrying..."); + } + else { + throw e; + } + } + } + + throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts."); + } + + private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler { + + private final KafkaConsumer<byte[], byte[]> offsetClient; + + public KafkaOffsetHandlerImpl(Properties props) { + offsetClient = new KafkaConsumer<>(props); + } + + @Override + public Long getCommittedOffset(String topicName, int partition) { + OffsetAndMetadata committed = offsetClient.committed(new TopicPartition(topicName, partition)); + return (committed != null) ? committed.offset() : null; + } + + @Override + public void close() { + offsetClient.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..fbeb110 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/resources/log4j-test.properties @@ -0,0 +1,30 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger +log4j.logger.org.apache.zookeeper=OFF, testlogger +log4j.logger.state.change.logger=OFF, testlogger +log4j.logger.kafka=OFF, testlogger http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml b/flink-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml new file mode 100644 index 0000000..45b3b92 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/resources/logback-test.xml @@ -0,0 +1,30 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> + <logger name="org.apache.flink.streaming" level="WARN"/> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/pom.xml b/flink-connectors/flink-connector-kafka-0.8/pom.xml new file mode 100644 index 0000000..d1fecb6 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/pom.xml @@ -0,0 +1,219 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.2-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-kafka-0.8_2.10</artifactId> + <name>flink-connector-kafka-0.8</name> + + <packaging>jar</packaging> + + <!-- Allow users to pass custom connector versions --> + <properties> + <kafka.version>0.8.2.2</kafka.version> + </properties> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-curator-recipes</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-base_2.10</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + <!-- Projects depending on this project, + won't depend on flink-table. --> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + <version>${kafka.version}</version> + <exclusions> + <exclusion> + <groupId>com.sun.jmx</groupId> + <artifactId>jmxri</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jdmk</groupId> + <artifactId>jmxtools</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + </exclusion> + <exclusion> + <groupId>net.sf.jopt-simple</groupId> + <artifactId>jopt-simple</artifactId> + </exclusion> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + </exclusion> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>com.yammer.metrics</groupId> + <artifactId>metrics-annotation</artifactId> + </exclusion> + <exclusion> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <version>${curator.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-jmx</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-base_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-tests_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + </dependencies> + + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <!-- Enforce single fork execution due to heavy mini cluster use in the tests --> + <forkCount>1</forkCount> + </configuration> + </plugin> + <!-- Relocate curator --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-flink</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <artifactSet> + <includes combine.children="append"> + <include>org.apache.flink:flink-shaded-curator-recipes</include> + </includes> + </artifactSet> + <relocations combine.children="append"> + <relocation> + <pattern>org.apache.curator</pattern> + <shadedPattern>org.apache.flink.shaded.org.apache.curator</shadedPattern> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java new file mode 100644 index 0000000..0aacccd --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import kafka.api.OffsetRequest; +import kafka.cluster.Broker; +import kafka.common.ErrorMapping; +import kafka.javaapi.PartitionMetadata; +import kafka.javaapi.TopicMetadata; +import kafka.javaapi.TopicMetadataRequest; +import kafka.javaapi.consumer.SimpleConsumer; + +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; +import org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.util.NetUtils; +import org.apache.flink.util.PropertiesUtil; +import org.apache.flink.util.SerializedValue; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; + +import java.net.InetAddress; +import java.net.URL; +import java.net.UnknownHostException; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Random; + +import static org.apache.flink.util.PropertiesUtil.getInt; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from + * Apache Kafka 0.8.x. The consumer can run in multiple parallel instances, each of which will pull + * data from one or more Kafka partitions. + * + * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost + * during a failure, and that the computation processes elements "exactly once". + * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p> + * + * <p>Flink's Kafka Consumer is designed to be compatible with Kafka's High-Level Consumer API (0.8.x). + * Most of Kafka's configuration variables can be used with this consumer as well: + * <ul> + * <li>socket.timeout.ms</li> + * <li>socket.receive.buffer.bytes</li> + * <li>fetch.message.max.bytes</li> + * <li>auto.offset.reset with the values "largest", "smallest"</li> + * <li>fetch.wait.max.ms</li> + * </ul> + * </li> + * </ul> + * + * <h1>Offset handling</h1> + * + * <p>Offsets whose records have been read and are checkpointed will be committed back to ZooKeeper + * by the offset handler. In addition, the offset handler finds the point where the source initially + * starts reading from the stream, when the streaming job is started.</p> + * + * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets + * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view + * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer + * has consumed a topic.</p> + * + * <p>If checkpointing is disabled, the consumer will periodically commit the current offset + * to Zookeeper.</p> + * + * <p>When using a Kafka topic to send data between Flink jobs, we recommend using the + * {@see TypeInformationSerializationSchema} and {@see TypeInformationKeyValueSerializationSchema}.</p> + * + * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer + * is constructed. That means that the client that submits the program needs to be able to + * reach the Kafka brokers or ZooKeeper.</p> + */ +public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { + + private static final long serialVersionUID = -6272159445203409112L; + + /** Configuration key for the number of retries for getting the partition info */ + public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry"; + + /** Default number of retries for getting the partition info. One retry means going through the full list of brokers */ + public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3; + + // ------------------------------------------------------------------------ + + /** The properties to parametrize the Kafka consumer and ZooKeeper client */ + private final Properties kafkaProperties; + + /** The behavior when encountering an invalid offset (see {@link OffsetRequest}) */ + private final long invalidOffsetBehavior; + + /** The interval in which to automatically commit (-1 if deactivated) */ + private final long autoCommitInterval; + + // ------------------------------------------------------------------------ + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.8.x + * + * @param topic + * The name of the topic that should be consumed. + * @param valueDeserializer + * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties used to configure the Kafka consumer client, and the ZooKeeper client. + */ + public FlinkKafkaConsumer08(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { + this(Collections.singletonList(topic), valueDeserializer, props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.8.x + * + * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value + * pairs, offsets, and topic names from Kafka. + * + * @param topic + * The name of the topic that should be consumed. + * @param deserializer + * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties used to configure the Kafka consumer client, and the ZooKeeper client. + */ + public FlinkKafkaConsumer08(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) { + this(Collections.singletonList(topic), deserializer, props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.8.x + * + * This constructor allows passing multiple topics to the consumer. + * + * @param topics + * The Kafka topics to read from. + * @param deserializer + * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties that are used to configure both the fetcher and the offset handler. + */ + public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deserializer, Properties props) { + this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.8.x + * + * This constructor allows passing multiple topics and a key/value deserialization schema. + * + * @param topics + * The Kafka topics to read from. + * @param deserializer + * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties that are used to configure both the fetcher and the offset handler. + */ + public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) { + super(topics, deserializer); + + checkNotNull(topics, "topics"); + this.kafkaProperties = checkNotNull(props, "props"); + + // validate the zookeeper properties + validateZooKeeperConfig(props); + + this.invalidOffsetBehavior = getInvalidOffsetBehavior(props); + this.autoCommitInterval = PropertiesUtil.getLong(props, "auto.commit.interval.ms", 60000); + } + + @Override + protected AbstractFetcher<T, ?> createFetcher( + SourceContext<T> sourceContext, + List<KafkaTopicPartition> thisSubtaskPartitions, + SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, + SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, + StreamingRuntimeContext runtimeContext) throws Exception { + + boolean useMetrics = !Boolean.valueOf(kafkaProperties.getProperty(KEY_DISABLE_METRICS, "false")); + + return new Kafka08Fetcher<>(sourceContext, thisSubtaskPartitions, + watermarksPeriodic, watermarksPunctuated, + runtimeContext, deserializer, kafkaProperties, + invalidOffsetBehavior, autoCommitInterval, useMetrics); + } + + @Override + protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) { + // Connect to a broker to get the partitions for all topics + List<KafkaTopicPartition> partitionInfos = + KafkaTopicPartition.dropLeaderData(getPartitionsForTopic(topics, kafkaProperties)); + + if (partitionInfos.size() == 0) { + throw new RuntimeException( + "Unable to retrieve any partitions for the requested topics " + topics + + ". Please check previous log entries"); + } + + if (LOG.isInfoEnabled()) { + logPartitionInfo(LOG, partitionInfos); + } + + return partitionInfos; + } + + // ------------------------------------------------------------------------ + // Kafka / ZooKeeper communication utilities + // ------------------------------------------------------------------------ + + /** + * Send request to Kafka to get partitions for topic. + * + * @param topics The name of the topics. + * @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic. + */ + public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> topics, Properties properties) { + String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); + final int numRetries = getInt(properties, GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES); + + checkNotNull(seedBrokersConfString, "Configuration property %s not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); + String[] seedBrokers = seedBrokersConfString.split(","); + List<KafkaTopicPartitionLeader> partitions = new ArrayList<>(); + + final String clientId = "flink-kafka-consumer-partition-lookup"; + final int soTimeout = getInt(properties, "socket.timeout.ms", 30000); + final int bufferSize = getInt(properties, "socket.receive.buffer.bytes", 65536); + + Random rnd = new Random(); + retryLoop: for (int retry = 0; retry < numRetries; retry++) { + // we pick a seed broker randomly to avoid overloading the first broker with all the requests when the + // parallel source instances start. Still, we try all available brokers. + int index = rnd.nextInt(seedBrokers.length); + brokersLoop: for (int arrIdx = 0; arrIdx < seedBrokers.length; arrIdx++) { + String seedBroker = seedBrokers[index]; + LOG.info("Trying to get topic metadata from broker {} in try {}/{}", seedBroker, retry, numRetries); + if (++index == seedBrokers.length) { + index = 0; + } + + URL brokerUrl = NetUtils.getCorrectHostnamePort(seedBroker); + SimpleConsumer consumer = null; + try { + consumer = new SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, clientId); + + TopicMetadataRequest req = new TopicMetadataRequest(topics); + kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); + + List<TopicMetadata> metaData = resp.topicsMetadata(); + + // clear in case we have an incomplete list from previous tries + partitions.clear(); + for (TopicMetadata item : metaData) { + if (item.errorCode() != ErrorMapping.NoError()) { + // warn and try more brokers + LOG.warn("Error while getting metadata from broker " + seedBroker + " to find partitions " + + "for " + topics.toString() + ". Error: " + ErrorMapping.exceptionFor(item.errorCode()).getMessage()); + continue brokersLoop; + } + if (!topics.contains(item.topic())) { + LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ..."); + continue brokersLoop; + } + for (PartitionMetadata part : item.partitionsMetadata()) { + Node leader = brokerToNode(part.leader()); + KafkaTopicPartition ktp = new KafkaTopicPartition(item.topic(), part.partitionId()); + KafkaTopicPartitionLeader pInfo = new KafkaTopicPartitionLeader(ktp, leader); + partitions.add(pInfo); + } + } + break retryLoop; // leave the loop through the brokers + } catch (Exception e) { + //validates seed brokers in case of a ClosedChannelException + validateSeedBrokers(seedBrokers, e); + LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topics.toString() + "." + + "" + e.getClass() + ". Message: " + e.getMessage()); + LOG.debug("Detailed trace", e); + // we sleep a bit. Retrying immediately doesn't make sense in cases where Kafka is reorganizing the leader metadata + try { + Thread.sleep(500); + } catch (InterruptedException e1) { + // sleep shorter. + } + } finally { + if (consumer != null) { + consumer.close(); + } + } + } // brokers loop + } // retries loop + return partitions; + } + + /** + * Turn a broker instance into a node instance + * @param broker broker instance + * @return Node representing the given broker + */ + private static Node brokerToNode(Broker broker) { + return new Node(broker.id(), broker.host(), broker.port()); + } + + /** + * Validate the ZK configuration, checking for required parameters + * @param props Properties to check + */ + protected static void validateZooKeeperConfig(Properties props) { + if (props.getProperty("zookeeper.connect") == null) { + throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set in the properties"); + } + if (props.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) { + throw new IllegalArgumentException("Required property '" + ConsumerConfig.GROUP_ID_CONFIG + + "' has not been set in the properties"); + } + + try { + //noinspection ResultOfMethodCallIgnored + Integer.parseInt(props.getProperty("zookeeper.session.timeout.ms", "0")); + } + catch (NumberFormatException e) { + throw new IllegalArgumentException("Property 'zookeeper.session.timeout.ms' is not a valid integer"); + } + + try { + //noinspection ResultOfMethodCallIgnored + Integer.parseInt(props.getProperty("zookeeper.connection.timeout.ms", "0")); + } + catch (NumberFormatException e) { + throw new IllegalArgumentException("Property 'zookeeper.connection.timeout.ms' is not a valid integer"); + } + } + + /** + * Validate that at least one seed broker is valid in case of a + * ClosedChannelException. + * + * @param seedBrokers + * array containing the seed brokers e.g. ["host1:port1", + * "host2:port2"] + * @param exception + * instance + */ + private static void validateSeedBrokers(String[] seedBrokers, Exception exception) { + if (!(exception instanceof ClosedChannelException)) { + return; + } + int unknownHosts = 0; + for (String broker : seedBrokers) { + URL brokerUrl = NetUtils.getCorrectHostnamePort(broker.trim()); + try { + InetAddress.getByName(brokerUrl.getHost()); + } catch (UnknownHostException e) { + unknownHosts++; + } + } + // throw meaningful exception if all the provided hosts are invalid + if (unknownHosts == seedBrokers.length) { + throw new IllegalArgumentException("All the servers provided in: '" + + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + "' config are invalid. (unknown hosts)"); + } + } + + private static long getInvalidOffsetBehavior(Properties config) { + final String val = config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest"); + if (val.equals("none")) { + throw new IllegalArgumentException("Cannot use '" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + + "' value 'none'. Possible values: 'latest', 'largest', or 'earliest'."); + } + else if (val.equals("largest") || val.equals("latest")) { // largest is kafka 0.8, latest is kafka 0.9 + return OffsetRequest.LatestTime(); + } else { + return OffsetRequest.EarliestTime(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java new file mode 100644 index 0000000..56ccd0b --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.util.Properties; + +/** + * THIS CLASS IS DEPRECATED. Use FlinkKafkaConsumer08 instead. + */ +@Deprecated +public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer08<T> { + + private static final long serialVersionUID = -5649906773771949146L; + + /** + * THIS CONSTRUCTOR IS DEPRECATED. Use FlinkKafkaConsumer08 instead. + */ + @Deprecated + public FlinkKafkaConsumer081(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { + super(topic, valueDeserializer, props); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java new file mode 100644 index 0000000..0520336 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.util.Properties; + +/** + * THIS CLASS IS DEPRECATED. Use FlinkKafkaConsumer08 instead. + */ +@Deprecated +public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer08<T> { + + private static final long serialVersionUID = -5649906773771949146L; + + /** + * THIS CONSTRUCTOR IS DEPRECATED. Use FlinkKafkaConsumer08 instead. + */ + @Deprecated + public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { + super(topic, valueDeserializer, props); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java new file mode 100644 index 0000000..1c2e0b7 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import java.util.Properties; + + +/** + * THIS CLASS IS DEPRECATED. Use FlinkKafkaProducer08 instead. + */ +@Deprecated +public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN> { + + @Deprecated + public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) { + super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), null); + } + + @Deprecated + public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) { + super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, null); + } + + @Deprecated + public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) { + super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); + + } + + @Deprecated + public FlinkKafkaProducer(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) { + super(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), null); + } + + @Deprecated + public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) { + super(topicId, serializationSchema, producerConfig, null); + } + + @Deprecated + public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) { + super(topicId, serializationSchema, producerConfig, customPartitioner); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java new file mode 100644 index 0000000..65de5fc --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import java.util.Properties; + + +/** + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.8. + * + * Please note that this producer does not have any reliability guarantees. + * + * @param <IN> Type of the messages to write into Kafka. + */ +public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> { + + private static final long serialVersionUID = 1L; + + // ------------------- Keyless serialization schema constructors ---------------------- + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to + * the topic. + * + * @param brokerList + * Comma separated addresses of the brokers + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined (keyless) serialization schema. + */ + public FlinkKafkaProducer08(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to + * the topic. + * + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined (keyless) serialization schema. + * @param producerConfig + * Properties with the producer configuration. + */ + public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>()); + } + + /** + * The main constructor for creating a FlinkKafkaProducer. + * + * @param topicId The topic to write data to + * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions. + */ + public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); + + } + + // ------------------- Key/Value serialization schema constructors ---------------------- + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to + * the topic. + * + * @param brokerList + * Comma separated addresses of the brokers + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined serialization schema supporting key/value messages + */ + public FlinkKafkaProducer08(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) { + this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to + * the topic. + * + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined serialization schema supporting key/value messages + * @param producerConfig + * Properties with the producer configuration. + */ + public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) { + this(topicId, serializationSchema, producerConfig, new FixedPartitioner<IN>()); + } + + /** + * The main constructor for creating a FlinkKafkaProducer. + * + * @param topicId The topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions. + */ + public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { + super(topicId, serializationSchema, producerConfig, customPartitioner); + } + + @Override + protected void flush() { + // The Kafka 0.8 producer doesn't support flushing, we wait here + // until all pending records are confirmed + synchronized (pendingRecordsLock) { + while (pendingRecords > 0) { + try { + pendingRecordsLock.wait(); + } catch (InterruptedException e) { + // this can be interrupted when the Task has been cancelled. + // by throwing an exception, we ensure that this checkpoint doesn't get confirmed + throw new RuntimeException("Flushing got interrupted while checkpointing", e); + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java new file mode 100644 index 0000000..b155576 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import java.util.Properties; + +/** + * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format. + */ +public class Kafka08JsonTableSink extends KafkaJsonTableSink { + + /** + * Creates {@link KafkaTableSink} for Kafka 0.8 + * + * @param topic topic in Kafka to which table is written + * @param properties properties to connect to Kafka + * @param partitioner Kafka partitioner + */ + public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) { + super(topic, properties, partitioner); + } + + @Override + protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) { + return new FlinkKafkaProducer08<>(topic, serializationSchema, properties, partitioner); + } + + @Override + protected Kafka08JsonTableSink createCopy() { + return new Kafka08JsonTableSink(topic, properties, partitioner); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java new file mode 100644 index 0000000..63bb57e --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.sources.StreamTableSource; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.util.Properties; + +/** + * Kafka {@link StreamTableSource} for Kafka 0.8. + */ +public class Kafka08JsonTableSource extends KafkaJsonTableSource { + + /** + * Creates a Kafka 0.8 JSON {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka08JsonTableSource( + String topic, + Properties properties, + String[] fieldNames, + TypeInformation<?>[] fieldTypes) { + + super(topic, properties, fieldNames, fieldTypes); + } + + /** + * Creates a Kafka 0.8 JSON {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka08JsonTableSource( + String topic, + Properties properties, + String[] fieldNames, + Class<?>[] fieldTypes) { + + super(topic, properties, fieldNames, fieldTypes); + } + + @Override + FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) { + return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java new file mode 100644 index 0000000..8f51237 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.sources.StreamTableSource; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.util.Properties; + +/** + * Kafka {@link StreamTableSource} for Kafka 0.8. + */ +public class Kafka08TableSource extends KafkaTableSource { + + /** + * Creates a Kafka 0.8 {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param deserializationSchema Deserialization schema to use for Kafka records. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka08TableSource( + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + String[] fieldNames, + TypeInformation<?>[] fieldTypes) { + + super(topic, properties, deserializationSchema, fieldNames, fieldTypes); + } + + /** + * Creates a Kafka 0.8 {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param deserializationSchema Deserialization schema to use for Kafka records. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka08TableSource( + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + String[] fieldNames, + Class<?>[] fieldTypes) { + + super(topic, properties, deserializationSchema, fieldNames, fieldTypes); + } + + @Override + FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) { + return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java new file mode 100644 index 0000000..23ff276 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import static java.util.Objects.requireNonNull; + +/** + * A special form of blocking queue with two additions: + * <ol> + * <li>The queue can be closed atomically when empty. Adding elements after the queue + * is closed fails. This allows queue consumers to atomically discover that no elements + * are available and mark themselves as shut down.</li> + * <li>The queue allows to poll batches of elements in one polling call.</li> + * </ol> + * + * The queue has no capacity restriction and is safe for multiple producers and consumers. + * + * <p>Note: Null elements are prohibited. + * + * @param <E> The type of elements in the queue. + */ +public class ClosableBlockingQueue<E> { + + /** The lock used to make queue accesses and open checks atomic */ + private final ReentrantLock lock; + + /** The condition on which blocking get-calls wait if the queue is empty */ + private final Condition nonEmpty; + + /** The deque of elements */ + private final ArrayDeque<E> elements; + + /** Flag marking the status of the queue */ + private volatile boolean open; + + // ------------------------------------------------------------------------ + + /** + * Creates a new empty queue. + */ + public ClosableBlockingQueue() { + this(10); + } + + /** + * Creates a new empty queue, reserving space for at least the specified number + * of elements. The queu can still grow, of more elements are added than the + * reserved space. + * + * @param initialSize The number of elements to reserve space for. + */ + public ClosableBlockingQueue(int initialSize) { + this.lock = new ReentrantLock(true); + this.nonEmpty = this.lock.newCondition(); + + this.elements = new ArrayDeque<>(initialSize); + this.open = true; + + + } + + /** + * Creates a new queue that contains the given elements. + * + * @param initialElements The elements to initially add to the queue. + */ + public ClosableBlockingQueue(Collection<? extends E> initialElements) { + this(initialElements.size()); + this.elements.addAll(initialElements); + } + + // ------------------------------------------------------------------------ + // Size and status + // ------------------------------------------------------------------------ + + /** + * Gets the number of elements currently in the queue. + * @return The number of elements currently in the queue. + */ + public int size() { + lock.lock(); + try { + return elements.size(); + } finally { + lock.unlock(); + } + } + + /** + * Checks whether the queue is empty (has no elements). + * @return True, if the queue is empty; false, if it is non-empty. + */ + public boolean isEmpty() { + return size() == 0; + } + + /** + * Checks whether the queue is currently open, meaning elements can be added and polled. + * @return True, if the queue is open; false, if it is closed. + */ + public boolean isOpen() { + return open; + } + + /** + * Tries to close the queue. Closing the queue only succeeds when no elements are + * in the queue when this method is called. Checking whether the queue is empty, and + * marking the queue as closed is one atomic operation. + * + * @return True, if the queue is closed, false if the queue remains open. + */ + public boolean close() { + lock.lock(); + try { + if (open) { + if (elements.isEmpty()) { + open = false; + nonEmpty.signalAll(); + return true; + } else { + return false; + } + } + else { + // already closed + return true; + } + } finally { + lock.unlock(); + } + } + + // ------------------------------------------------------------------------ + // Adding / Removing elements + // ------------------------------------------------------------------------ + + /** + * Tries to add an element to the queue, if the queue is still open. Checking whether the queue + * is open and adding the element is one atomic operation. + * + * <p>Unlike the {@link #add(Object)} method, this method never throws an exception, + * but only indicates via the return code if the element was added or the + * queue was closed. + * + * @param element The element to add. + * @return True, if the element was added, false if the queue was closes. + */ + public boolean addIfOpen(E element) { + requireNonNull(element); + + lock.lock(); + try { + if (open) { + elements.addLast(element); + if (elements.size() == 1) { + nonEmpty.signalAll(); + } + } + return open; + } finally { + lock.unlock(); + } + } + + /** + * Adds the element to the queue, or fails with an exception, if the queue is closed. + * Checking whether the queue is open and adding the element is one atomic operation. + * + * @param element The element to add. + * @throws IllegalStateException Thrown, if the queue is closed. + */ + public void add(E element) throws IllegalStateException { + requireNonNull(element); + + lock.lock(); + try { + if (open) { + elements.addLast(element); + if (elements.size() == 1) { + nonEmpty.signalAll(); + } + } else { + throw new IllegalStateException("queue is closed"); + } + } finally { + lock.unlock(); + } + } + + /** + * Returns the queue's next element without removing it, if the queue is non-empty. + * Otherwise, returns null. + * + * <p>The method throws an {@code IllegalStateException} if the queue is closed. + * Checking whether the queue is open and getting the next element is one atomic operation. + * + * <p>This method never blocks. + * + * @return The queue's next element, or null, if the queue is empty. + * @throws IllegalStateException Thrown, if the queue is closed. + */ + public E peek() { + lock.lock(); + try { + if (open) { + if (elements.size() > 0) { + return elements.getFirst(); + } else { + return null; + } + } else { + throw new IllegalStateException("queue is closed"); + } + } finally { + lock.unlock(); + } + } + + /** + * Returns the queue's next element and removes it, the queue is non-empty. + * Otherwise, this method returns null. + * + * <p>The method throws an {@code IllegalStateException} if the queue is closed. + * Checking whether the queue is open and removing the next element is one atomic operation. + * + * <p>This method never blocks. + * + * @return The queue's next element, or null, if the queue is empty. + * @throws IllegalStateException Thrown, if the queue is closed. + */ + public E poll() { + lock.lock(); + try { + if (open) { + if (elements.size() > 0) { + return elements.removeFirst(); + } else { + return null; + } + } else { + throw new IllegalStateException("queue is closed"); + } + } finally { + lock.unlock(); + } + } + + /** + * Returns all of the queue's current elements in a list, if the queue is non-empty. + * Otherwise, this method returns null. + * + * <p>The method throws an {@code IllegalStateException} if the queue is closed. + * Checking whether the queue is open and removing the elements is one atomic operation. + * + * <p>This method never blocks. + * + * @return All of the queue's elements, or null, if the queue is empty. + * @throws IllegalStateException Thrown, if the queue is closed. + */ + public List<E> pollBatch() { + lock.lock(); + try { + if (open) { + if (elements.size() > 0) { + ArrayList<E> result = new ArrayList<>(elements); + elements.clear(); + return result; + } else { + return null; + } + } else { + throw new IllegalStateException("queue is closed"); + } + } finally { + lock.unlock(); + } + } + + /** + * Returns the next element in the queue. If the queue is empty, this method + * waits until at least one element is added. + * + * <p>The method throws an {@code IllegalStateException} if the queue is closed. + * Checking whether the queue is open and removing the next element is one atomic operation. + * + * @return The next element in the queue, never null. + * + * @throws IllegalStateException Thrown, if the queue is closed. + * @throws InterruptedException Throw, if the thread is interrupted while waiting for an + * element to be added. + */ + public E getElementBlocking() throws InterruptedException { + lock.lock(); + try { + while (open && elements.isEmpty()) { + nonEmpty.await(); + } + + if (open) { + return elements.removeFirst(); + } else { + throw new IllegalStateException("queue is closed"); + } + } finally { + lock.unlock(); + } + } + + /** + * Returns the next element in the queue. If the queue is empty, this method + * waits at most a certain time until an element becomes available. If no element + * is available after that time, the method returns null. + * + * <p>The method throws an {@code IllegalStateException} if the queue is closed. + * Checking whether the queue is open and removing the next element is one atomic operation. + * + * @param timeoutMillis The number of milliseconds to block, at most. + * @return The next element in the queue, or null, if the timeout expires before an element is available. + * + * @throws IllegalStateException Thrown, if the queue is closed. + * @throws InterruptedException Throw, if the thread is interrupted while waiting for an + * element to be added. + */ + public E getElementBlocking(long timeoutMillis) throws InterruptedException { + if (timeoutMillis == 0L) { + // wait forever case + return getElementBlocking(); + } else if (timeoutMillis < 0L) { + throw new IllegalArgumentException("invalid timeout"); + } + + final long deadline = System.currentTimeMillis() + timeoutMillis; + + lock.lock(); + try { + while (open && elements.isEmpty() && timeoutMillis > 0) { + nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS); + timeoutMillis = deadline - System.currentTimeMillis(); + } + + if (!open) { + throw new IllegalStateException("queue is closed"); + } + else if (elements.isEmpty()) { + return null; + } else { + return elements.removeFirst(); + } + } finally { + lock.unlock(); + } + } + + /** + * Gets all the elements found in the list, or blocks until at least one element + * was added. If the queue is empty when this method is called, it blocks until + * at least one element is added. + * + * <p>This method always returns a list with at least one element. + * + * <p>The method throws an {@code IllegalStateException} if the queue is closed. + * Checking whether the queue is open and removing the next element is one atomic operation. + * + * @return A list with all elements in the queue, always at least one element. + * + * @throws IllegalStateException Thrown, if the queue is closed. + * @throws InterruptedException Throw, if the thread is interrupted while waiting for an + * element to be added. + */ + public List<E> getBatchBlocking() throws InterruptedException { + lock.lock(); + try { + while (open && elements.isEmpty()) { + nonEmpty.await(); + } + if (open) { + ArrayList<E> result = new ArrayList<>(elements); + elements.clear(); + return result; + } else { + throw new IllegalStateException("queue is closed"); + } + } finally { + lock.unlock(); + } + } + + /** + * Gets all the elements found in the list, or blocks until at least one element + * was added. This method is similar as {@link #getBatchBlocking()}, but takes + * a number of milliseconds that the method will maximally wait before returning. + * + * <p>This method never returns null, but an empty list, if the queue is empty when + * the method is called and the request times out before an element was added. + * + * <p>The method throws an {@code IllegalStateException} if the queue is closed. + * Checking whether the queue is open and removing the next element is one atomic operation. + * + * @param timeoutMillis The number of milliseconds to wait, at most. + * @return A list with all elements in the queue, possible an empty list. + * + * @throws IllegalStateException Thrown, if the queue is closed. + * @throws InterruptedException Throw, if the thread is interrupted while waiting for an + * element to be added. + */ + public List<E> getBatchBlocking(long timeoutMillis) throws InterruptedException { + if (timeoutMillis == 0L) { + // wait forever case + return getBatchBlocking(); + } else if (timeoutMillis < 0L) { + throw new IllegalArgumentException("invalid timeout"); + } + + final long deadline = System.currentTimeMillis() + timeoutMillis; + + lock.lock(); + try { + while (open && elements.isEmpty() && timeoutMillis > 0) { + nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS); + timeoutMillis = deadline - System.currentTimeMillis(); + } + + if (!open) { + throw new IllegalStateException("queue is closed"); + } + else if (elements.isEmpty()) { + return Collections.emptyList(); + } + else { + ArrayList<E> result = new ArrayList<>(elements); + elements.clear(); + return result; + } + } finally { + lock.unlock(); + } + } + + // ------------------------------------------------------------------------ + // Standard Utilities + // ------------------------------------------------------------------------ + + @Override + public int hashCode() { + int hashCode = 17; + for (E element : elements) { + hashCode = 31 * hashCode + element.hashCode(); + } + return hashCode; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } else if (obj != null && obj.getClass() == ClosableBlockingQueue.class) { + @SuppressWarnings("unchecked") + ClosableBlockingQueue<E> that = (ClosableBlockingQueue<E>) obj; + + if (this.elements.size() == that.elements.size()) { + Iterator<E> thisElements = this.elements.iterator(); + for (E thatNext : that.elements) { + E thisNext = thisElements.next(); + if (!(thisNext == null ? thatNext == null : thisNext.equals(thatNext))) { + return false; + } + } + return true; + } else { + return false; + } + } else { + return false; + } + } + + @Override + public String toString() { + return elements.toString(); + } +}
