This is an automated email from the ASF dual-hosted git repository. adriancole pushed a commit to branch no-kafka08 in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git
commit 0bd0144aeea4bb85afb1755d897c79ae1489d88f Author: Adrian Cole <[email protected]> AuthorDate: Tue May 7 16:34:52 2019 +0800 Removes Kafka 0.8 support (KAFKA_ZOOKEEPER) This removes support for Kafka 0.8 (last updated almost 4 years ago). Notably, this means those using `KAFKA_ZOOKEEPER` to configure their broker need to switch to `KAFKA_BOOTSTRAP_SERVERS` instead. Note: Kafka 0.8 was not packaged into zipkin-server, it was an optional add-on. However, our docker image was created in such a way that it felt like it was available by default. See https://lists.apache.org/thread.html/432df5a806ee27dd959ded5ebf5e7cc6bd4370f6b1b1daf7bf594e80@%3Cdev.zipkin.apache.org%3E --- .travis.yml | 6 - zipkin-autoconfigure/collector-kafka08/README.md | 135 ---------- zipkin-autoconfigure/collector-kafka08/pom.xml | 104 -------- .../kafka08/KafkaZooKeeperSetCondition.java | 45 ---- .../ZipkinKafka08CollectorAutoConfiguration.java | 63 ----- .../kafka08/ZipkinKafkaCollectorProperties.java | 90 ------- .../src/main/resources/META-INF/spring.factories | 2 - .../src/main/resources/zipkin-server-kafka08.yml | 7 - .../autoconfigure/collector/kafka08/Access.java | 42 ---- ...ipkinKafka08CollectorAutoConfigurationTest.java | 111 --------- .../kafka/v1/NestedPropertyOverrideTest.java | 46 ---- zipkin-autoconfigure/pom.xml | 1 - zipkin-collector/kafka/README.md | 2 +- zipkin-collector/kafka08/README.md | 41 --- zipkin-collector/kafka08/pom.xml | 59 ----- .../zipkin2/collector/kafka08/KafkaCollector.java | 277 --------------------- .../collector/kafka08/KafkaStreamProcessor.java | 84 ------- .../collector/kafka08/ITKafkaCollector.java | 250 ------------------- .../zipkin2/collector/kafka08/KafkaTestGraph.java | 51 ---- .../kafka08/src/test/resources/log4j.properties | 7 - .../kafka08/src/test/resources/log4j2.properties | 11 - zipkin-collector/pom.xml | 1 - zipkin-server/README.md | 8 - 23 files changed, 1 insertion(+), 1442 deletions(-) diff --git a/.travis.yml b/.travis.yml index 9747dd9..74e4a50f 100755 --- a/.travis.yml +++ b/.travis.yml @@ -11,7 +11,6 @@ cache: # zipkin-ui gets dependencies via NPM - $HOME/.npm - $HOME/.m2 - - $HOME/kafka_2.11-0.8.2.2 language: java @@ -28,11 +27,6 @@ before_install: # Required for Elasticsearch 5 (See https://github.com/docker-library/docs/tree/master/elasticsearch#host-setup) - sudo sysctl -w vm.max_map_count=262144 - # Manually install and run zk+kafka as it isn't an available service - - test -d $HOME/kafka_2.11-0.8.2.2/bin || curl -sSL https://archive.apache.org/dist/kafka/0.8.2.2/kafka_2.11-0.8.2.2.tgz | bash -c '(cd $HOME; tar -xzf -)' - - nohup bash -c "cd $HOME/kafka_2.11-0.8.2.2 && bin/zookeeper-server-start.sh config/zookeeper.properties >/dev/null 2>&1 &" - - nohup bash -c "cd $HOME/kafka_2.11-0.8.2.2 && bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &" - # Quiet Maven invoker logs (Downloading... when running zipkin-server/src/it) - echo "MAVEN_OPTS='-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn'" > ~/.mavenrc diff --git a/zipkin-autoconfigure/collector-kafka08/README.md b/zipkin-autoconfigure/collector-kafka08/README.md deleted file mode 100644 index 558b74d..0000000 --- a/zipkin-autoconfigure/collector-kafka08/README.md +++ /dev/null @@ -1,135 +0,0 @@ -# Kafka 0.8 Collector Auto-configure Module - -This module provides support for running the kafa 0.8 collector as a -component of Zipkin server. To activate this collector, reference the -module jar when running the Zipkin server and configure the ZooKeeper -connection string via the `KAFKA_ZOOKEEPER` environment -variable or `zipkin.collector.kafka.zookeeper` property. - -## Quick start - -JRE 8 is required to run Zipkin server. - -Fetch the latest released -[executable jar for Zipkin server](https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec) -and -[autoconfigure module jar for the kafka collector](https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-autoconfigure-collector-kafka08&v=LATEST&c=module). -Run Zipkin server with the Kafka 0.10+ collector enabled. - -For example: - -```bash -$ curl -sSL https://zipkin.io/quickstart.sh | bash -s -$ curl -sSL https://zipkin.io/quickstart.sh | bash -s io.zipkin.java:zipkin-autoconfigure-collector-kafka08:LATEST:module kafka08.jar -$ KAFKA_ZOOKEEPER=127.0.0.1:2181 \ - java \ - -Dloader.path='kafka08.jar,kafka08.jar!/lib' \ - -Dspring.profiles.active=kafka08 \ - -cp zipkin.jar \ - org.springframework.boot.loader.PropertiesLauncher -``` - -After executing these steps, the Zipkin UI will be available -[http://localhost:9411](http://localhost:9411) or port 9411 of the remote host the Zipkin server -was started on. - -The Zipkin server can be further configured as described in the -[Zipkin server documentation](../../zipkin-server/README.md). - -## How this works - -The Zipkin server executable jar and the autoconfigure module jar for -the kafka collector are required. The module jar contains the code for -loading and configuring the kafka collector, and any dependencies that -are not already packaged in the Zipkin server jar -(e.g. zipkin-collector-kafka08, kafka-clients). - -Using PropertiesLauncher as the main class runs the Zipkin server -executable jar the same as it would be if executed using -`java -jar zipkin.jar`, except it provides the option to load resources -from outside the executable jar into the classpath. Those external -resources are specified using the `loader.path` system property. In this -case, it is configured to load the kafka collector module jar -(`zipkin-autoconfigure-collector-kafka08-module.jar`) and the jar files -contained in the `lib/` directory within that module jar -(`zipkin-autoconfigure-collector-kafka08-module.jar!/lib`). - -The `spring.profiles=kafka08` system property causes configuration from -[zipkin-server-kafka08.yml](src/main/resources/zipkin-server-kafka08.yml) -to be loaded. - -For more information on how this works, see [Spring Boot's documentation -on the executable jar format](https://docs.spring.io/spring-boot/docs/current/reference/html/executable-jar.html). The -[section on PropertiesLauncher](https://docs.spring.io/spring-boot/docs/current/reference/html/executable-jar.html#executable-jar-property-launcher-features) -has more detail on how the external module jar and the libraries it -contains are loaded. - -## Configuration - -The following configuration points apply apply when `KAFKA_ZOOKEEPER` or -`zipkin.collector.kafka.zookeeper` is set. They can be configured by -setting an environment variable or by setting a java system property -using the `-Dproperty.name=value` command line argument. Some settings -correspond to "Consumer Configs" in [Kafka 0.8 documentation](https://kafka.apache.org/082/documentation.html#consumerconfigs). - -Environment Variable | Property | Consumer Config | Description ---- | --- | --- | --- -`KAFKA_ZOOKEEPER` | `zipkin.collector.kafka.zookeeper` | zookeeper.connect | Comma-separated list of zookeeper host/ports, ex. 127.0.0.1:2181. No default -`KAFKA_GROUP_ID` | `zipkin.collector.kafka.group-id` | group.id | The consumer group this process is consuming on behalf of. Defaults to `zipkin` -`KAFKA_TOPIC` | `zipkin.collector.kafka.topic` | N/A | The topic that zipkin spans will be consumed from. Defaults to `zipkin` -`KAFKA_STREAMS` | `zipkin.collector.kafka.streams` | N/A | Count of threads consuming the topic. Defaults to `1` - -### Other Kafka consumer properties -You may need to set other [Kafka consumer properties](https://kafka.apache.org/082/documentation.html#consumerconfigs), in -addition to the ones with explicit properties defined by the collector. -In this case, you need to prefix that property name with -`zipkin.collector.kafka.overrides` and pass it as a system property argument. - -For example, to override `auto.offset.reset`, you can set a system property named -`zipkin.collector.kafka.overrides.auto.offset.reset`: - -```bash -$ KAFKA_ZOOKEEPER=127.0.0.1:2181 \ - java \ - -Dloader.path='kafka08.jar,kafka08.jar!/lib' \ - -Dspring.profiles.active=kafka08 \ - -Dzipkin.collector.kafka.overrides.auto.offset.reset=latest \ - -cp zipkin.jar \ - org.springframework.boot.loader.PropertiesLauncher -``` - -### Examples - -Multiple ZooKeeper servers: - -```bash -$ KAFKA_ZOOKEEPER=zk1:2181,zk2:2181 \ - java \ - -Dloader.path='kafka08.jar,kafka08.jar!/lib' \ - -Dspring.profiles.active=kafka08 \ - -cp zipkin.jar \ - org.springframework.boot.loader.PropertiesLauncher -``` - -Alternate topic name(s): - -```bash -$ KAFKA_ZOOKEEPER=127.0.0.1:2181 \ - java \ - -Dloader.path='kafka08.jar,kafka08.jar!/lib' \ - -Dspring.profiles.active=kafka08 \ - -Dzipkin.collector.kafka.topic=zapkin,zipken \ - -cp zipkin.jar \ - org.springframework.boot.loader.PropertiesLauncher -``` - -Specifying ZooKeeper as a system property, instead of an environment variable: - -```bash -$ java \ - -Dloader.path='kafka08.jar,kafka08.jar!/lib' \ - -Dspring.profiles.active=kafka08 \ - -Dzipkin.collector.kafka.zookeeper=127.0.0.1:2181 \ - -cp zipkin.jar \ - org.springframework.boot.loader.PropertiesLauncher -``` diff --git a/zipkin-autoconfigure/collector-kafka08/pom.xml b/zipkin-autoconfigure/collector-kafka08/pom.xml deleted file mode 100644 index 7d384ab..0000000 --- a/zipkin-autoconfigure/collector-kafka08/pom.xml +++ /dev/null @@ -1,104 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.zipkin</groupId> - <artifactId>zipkin-autoconfigure-parent</artifactId> - <version>2.13.1-SNAPSHOT</version> - </parent> - - <artifactId>zipkin-autoconfigure-collector-kafka08</artifactId> - <name>Auto Configuration: Kafka Collector</name> - - <properties> - <main.basedir>${project.basedir}/../..</main.basedir> - </properties> - - <dependencies> - <dependency> - <groupId>${project.groupId}.zipkin2</groupId> - <artifactId>zipkin-collector-kafka08</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - </exclusions> - </dependency> - <!-- com.101tec:zkclient has a log4j dep, re-route it with the bridge - https://logging.apache.org/log4j/2.x/manual/migration.html --> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-1.2-api</artifactId> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-maven-plugin</artifactId> - <version>${spring-boot.version}</version> - <configuration> - <layoutFactory implementation="zipkin.layout.ZipkinLayoutFactory"> - <name>zipkin</name> - </layoutFactory> - <classifier>module</classifier> - <!-- exclude dependencies already packaged in zipkin-server --> - <!-- https://github.com/spring-projects/spring-boot/issues/3426 transitive exclude doesn't work --> - <excludeGroupIds> - org.springframework.boot,org.springframework,org.slf4j,commons-logging,com.google.code.gson - </excludeGroupIds> - <excludes> - <!-- excludes direct dependency instead of the group id, as otherwise we'd exclude ourselves --> - <exclude> - <groupId>${project.groupId}.zipkin2</groupId> - <artifactId>zipkin</artifactId> - </exclude> - <exclude> - <groupId>${project.groupId}.zipkin2</groupId> - <artifactId>zipkin-collector</artifactId> - </exclude> - <!-- excludes already packaged logging libraries in the server. Can't use group ID or we - would miss the api bridge --> - <exclude> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-core</artifactId> - </exclude> - <exclude> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-api</artifactId> - </exclude> - </excludes> - </configuration> - <dependencies> - <dependency> - <groupId>org.apache.zipkin.layout</groupId> - <artifactId>zipkin-layout-factory</artifactId> - <version>${zipkin-layout-factory.version}</version> - </dependency> - </dependencies> - </plugin> - </plugins> - </build> -</project> diff --git a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/KafkaZooKeeperSetCondition.java b/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/KafkaZooKeeperSetCondition.java deleted file mode 100644 index cc86eea..0000000 --- a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/KafkaZooKeeperSetCondition.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package zipkin2.autoconfigure.collector.kafka08; - -import org.springframework.boot.autoconfigure.condition.ConditionOutcome; -import org.springframework.boot.autoconfigure.condition.SpringBootCondition; -import org.springframework.context.annotation.ConditionContext; -import org.springframework.core.type.AnnotatedTypeMetadata; - -/** - * This condition passes when {@link ZipkinKafkaCollectorProperties#getZookeeper()} is set to - * non-empty. - * - * <p>This is here because the yaml defaults this property to empty like this, and spring-boot - * doesn't have an option to treat empty properties as unset. - * - * <pre>{@code - * zookeeper: ${KAFKA_ZOOKEEPER:} - * }</pre> - */ -final class KafkaZooKeeperSetCondition extends SpringBootCondition { - static final String PROPERTY_NAME = "zipkin.collector.kafka.zookeeper"; - - @Override - public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata a) { - String kafkaZookeeper = context.getEnvironment().getProperty(PROPERTY_NAME); - return kafkaZookeeper == null || kafkaZookeeper.isEmpty() - ? ConditionOutcome.noMatch(PROPERTY_NAME + " isn't set") - : ConditionOutcome.match(); - } -} diff --git a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfiguration.java b/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfiguration.java deleted file mode 100644 index 54b8f8a..0000000 --- a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfiguration.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package zipkin2.autoconfigure.collector.kafka08; - -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Conditional; -import org.springframework.context.annotation.Configuration; -import zipkin2.collector.CollectorMetrics; -import zipkin2.collector.CollectorSampler; -import zipkin2.collector.kafka08.KafkaCollector; -import zipkin2.storage.StorageComponent; - -/** - * This collector consumes a topic, decodes spans from thrift messages and stores them subject to - * sampling policy. - */ -@Configuration -@EnableConfigurationProperties(ZipkinKafkaCollectorProperties.class) -@Conditional(KafkaZooKeeperSetCondition.class) -class ZipkinKafka08CollectorAutoConfiguration { - - /** - * This launches a thread to run start. This prevents a several second hang, or worse crash if - * zookeeper isn't running, yet. - */ - @Bean - KafkaCollector kafka( - ZipkinKafkaCollectorProperties kafka, - CollectorSampler sampler, - CollectorMetrics metrics, - StorageComponent storage) { - final KafkaCollector result = - kafka.toBuilder().sampler(sampler).metrics(metrics).storage(storage).build(); - - // don't use @Bean(initMethod = "start") as it can crash the process if zookeeper is down - Thread start = - new Thread("start " + result.getClass().getSimpleName()) { - @Override - public void run() { - result.start(); - } - }; - start.setDaemon(true); - start.start(); - - return result; - } -} diff --git a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafkaCollectorProperties.java b/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafkaCollectorProperties.java deleted file mode 100644 index 28e9663..0000000 --- a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafkaCollectorProperties.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package zipkin2.autoconfigure.collector.kafka08; - -import java.util.LinkedHashMap; -import java.util.Map; -import org.springframework.boot.context.properties.ConfigurationProperties; -import zipkin2.collector.kafka08.KafkaCollector; - -@ConfigurationProperties("zipkin.collector.kafka") -class ZipkinKafkaCollectorProperties { - private String topic = "zipkin"; - private String zookeeper; - private String groupId = "zipkin"; - private int streams = 1; - private int maxMessageSize = 1024 * 1024; - private Map<String, String> overrides = new LinkedHashMap<>(); - - public String getTopic() { - return topic; - } - - public void setTopic(String topic) { - this.topic = topic; - } - - public String getZookeeper() { - return zookeeper; - } - - public void setZookeeper(String zookeeper) { - this.zookeeper = "".equals(zookeeper) ? null : zookeeper; - } - - public String getGroupId() { - return groupId; - } - - public void setGroupId(String groupId) { - this.groupId = groupId; - } - - public int getStreams() { - return streams; - } - - public void setStreams(int streams) { - this.streams = streams; - } - - public int getMaxMessageSize() { - return maxMessageSize; - } - - public void setMaxMessageSize(int maxMessageSize) { - this.maxMessageSize = maxMessageSize; - } - - public Map<String, String> getOverrides() { - return overrides; - } - - public void setOverrides(Map<String, String> overrides) { - this.overrides = overrides; - } - - public KafkaCollector.Builder toBuilder() { - return KafkaCollector.builder() - .topic(topic) - .zookeeper(zookeeper) - .groupId(groupId) - .streams(streams) - .maxMessageSize(maxMessageSize) - .overrides(overrides); - } -} diff --git a/zipkin-autoconfigure/collector-kafka08/src/main/resources/META-INF/spring.factories b/zipkin-autoconfigure/collector-kafka08/src/main/resources/META-INF/spring.factories deleted file mode 100644 index 9daca53..0000000 --- a/zipkin-autoconfigure/collector-kafka08/src/main/resources/META-INF/spring.factories +++ /dev/null @@ -1,2 +0,0 @@ -org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ -zipkin2.autoconfigure.collector.kafka08.ZipkinKafka08CollectorAutoConfiguration diff --git a/zipkin-autoconfigure/collector-kafka08/src/main/resources/zipkin-server-kafka08.yml b/zipkin-autoconfigure/collector-kafka08/src/main/resources/zipkin-server-kafka08.yml deleted file mode 100644 index 04ccf25..0000000 --- a/zipkin-autoconfigure/collector-kafka08/src/main/resources/zipkin-server-kafka08.yml +++ /dev/null @@ -1,7 +0,0 @@ -zipkin: - collector: - kafka: - # ZooKeeper host string, comma-separated host:port value. - zookeeper: ${KAFKA_ZOOKEEPER:} - # Maximum size of a message containing spans in bytes - max-message-size: ${KAFKA_MAX_MESSAGE_SIZE:1048576} diff --git a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/Access.java b/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/Access.java deleted file mode 100644 index 1ceeb0d..0000000 --- a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/Access.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package zipkin2.autoconfigure.collector.kafka08; - -import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.AnnotationConfigApplicationContext; -import org.springframework.context.annotation.Configuration; -import zipkin2.collector.kafka08.KafkaCollector; - -/** opens package access for testing */ -public final class Access { - - /** Just registering properties to avoid automatically connecting to a Kafka server */ - public static void registerKafkaProperties(AnnotationConfigApplicationContext context) { - context.register( - PropertyPlaceholderAutoConfiguration.class, EnableKafkaCollectorProperties.class); - } - - @Configuration - @EnableConfigurationProperties(ZipkinKafkaCollectorProperties.class) - static class EnableKafkaCollectorProperties {} - - public static KafkaCollector.Builder collectorBuilder( - AnnotationConfigApplicationContext context) { - return context.getBean(ZipkinKafkaCollectorProperties.class).toBuilder(); - } -} diff --git a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfigurationTest.java b/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfigurationTest.java deleted file mode 100644 index b8c2b81..0000000 --- a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfigurationTest.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package zipkin2.autoconfigure.collector.kafka08; - -import org.junit.After; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.springframework.beans.factory.NoSuchBeanDefinitionException; -import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration; -import org.springframework.boot.test.util.TestPropertyValues; -import org.springframework.context.annotation.AnnotationConfigApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import zipkin2.collector.Collector; -import zipkin2.collector.CollectorMetrics; -import zipkin2.collector.CollectorSampler; -import zipkin2.collector.kafka08.KafkaCollector; -import zipkin2.storage.InMemoryStorage; -import zipkin2.storage.StorageComponent; - -import static org.assertj.core.api.Assertions.assertThat; - -public class ZipkinKafka08CollectorAutoConfigurationTest { - - @Rule public ExpectedException thrown = ExpectedException.none(); - - AnnotationConfigApplicationContext context; - - @After - public void close() { - if (context != null) { - context.close(); - } - } - - @Test - public void doesntProvidesCollectorComponent_whenKafkaZooKeeperUnset() { - context = new AnnotationConfigApplicationContext(); - context.register( - PropertyPlaceholderAutoConfiguration.class, - ZipkinKafka08CollectorAutoConfiguration.class, - InMemoryConfiguration.class); - context.refresh(); - - thrown.expect(NoSuchBeanDefinitionException.class); - context.getBean(Collector.class); - } - - @Test - public void providesCollectorComponent_whenZooKeeperSet() { - context = new AnnotationConfigApplicationContext(); - TestPropertyValues.of("zipkin.collector.kafka.zookeeper:localhost").applyTo(context); - context.register( - PropertyPlaceholderAutoConfiguration.class, - ZipkinKafka08CollectorAutoConfiguration.class, - InMemoryConfiguration.class); - context.refresh(); - - assertThat(context.getBean(KafkaCollector.class)).isNotNull(); - } - - @Test - public void canOverrideProperty_topic() { - context = new AnnotationConfigApplicationContext(); - TestPropertyValues.of( - "zipkin.collector.kafka.zookeeper:localhost", - "zipkin.collector.kafka.topic:zapkin") - .applyTo(context); - context.register( - PropertyPlaceholderAutoConfiguration.class, - ZipkinKafka08CollectorAutoConfiguration.class, - InMemoryConfiguration.class); - context.refresh(); - - assertThat(context.getBean(ZipkinKafkaCollectorProperties.class).getTopic()) - .isEqualTo("zapkin"); - } - - @Configuration - static class InMemoryConfiguration { - @Bean - CollectorSampler sampler() { - return CollectorSampler.ALWAYS_SAMPLE; - } - - @Bean - CollectorMetrics metrics() { - return CollectorMetrics.NOOP_METRICS; - } - - @Bean - StorageComponent storage() { - return InMemoryStorage.newBuilder().build(); - } - } -} diff --git a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/collector/kafka/v1/NestedPropertyOverrideTest.java b/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/collector/kafka/v1/NestedPropertyOverrideTest.java deleted file mode 100644 index 88a490c..0000000 --- a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/collector/kafka/v1/NestedPropertyOverrideTest.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package zipkin2.collector.kafka08; - -import org.junit.Test; -import org.springframework.boot.test.util.TestPropertyValues; -import org.springframework.context.annotation.AnnotationConfigApplicationContext; -import zipkin2.autoconfigure.collector.kafka08.Access; -import zipkin2.storage.InMemoryStorage; - -import static org.assertj.core.api.Assertions.assertThat; - -public class NestedPropertyOverrideTest { - @Test - public void overrideWithNestedProperties() { - AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); - TestPropertyValues.of( - "zipkin.collector.kafka.zookeeper:localhost", - "zipkin.collector.kafka.overrides.auto.offset.reset:largest").applyTo(context); - Access.registerKafkaProperties(context); - context.refresh(); - - assertThat( - Access.collectorBuilder(context) - .storage(InMemoryStorage.newBuilder().build()) - .build() - .connector - .config - .autoOffsetReset()) - .isEqualTo("largest"); - } -} diff --git a/zipkin-autoconfigure/pom.xml b/zipkin-autoconfigure/pom.xml index cc1c81e..5cc3810 100644 --- a/zipkin-autoconfigure/pom.xml +++ b/zipkin-autoconfigure/pom.xml @@ -37,7 +37,6 @@ </properties> <modules> - <module>collector-kafka08</module> <module>collector-scribe</module> </modules> diff --git a/zipkin-collector/kafka/README.md b/zipkin-collector/kafka/README.md index 9b459a0..49eed4d 100644 --- a/zipkin-collector/kafka/README.md +++ b/zipkin-collector/kafka/README.md @@ -1,4 +1,4 @@ -# collector-kafka10 +# collector-kafka ## KafkaCollector This collector is implemented as a Kafka consumer supporting Kafka brokers running diff --git a/zipkin-collector/kafka08/README.md b/zipkin-collector/kafka08/README.md deleted file mode 100644 index 142f53f..0000000 --- a/zipkin-collector/kafka08/README.md +++ /dev/null @@ -1,41 +0,0 @@ -# collector-kafka - -## KafkaCollector -This collector polls a Kafka 8.2.2+ topic for messages that contain -a list of spans in json or TBinaryProtocol big-endian encoding. These -spans are pushed to a span consumer. - -`zipkin2.collector.kafka08.KafkaCollector.Builder` includes defaults that will -operate against a Kafka topic advertised in Zookeeper. - -## Encoding spans into Kafka messages -The message's binary data includes a list of spans. Supported encodings -are the same as the http [POST /spans](http://zipkin.io/zipkin-api/#/paths/%252Fspans) body. - -### Json -The message's binary data is a list of spans in json. The first character must be '[' (decimal 91). - -`Codec.JSON.writeSpans(spans)` performs the correct json encoding. - -Here's an example, sending a list of a single span to the zipkin topic: - -```bash -$ kafka-console-producer.sh --broker-list $ADVERTISED_HOST:9092 --topic zipkin -[{"traceId":"1","name":"bang","id":"2","timestamp":1234,"binaryAnnotations":[{"key":"lc","value":"bamm-bamm","endpoint":{"serviceName":"flintstones","ipv4":"127.0.0.1"}}]}] -``` - -### Thrift -The message's binary data includes a list header followed by N spans serialized in TBinaryProtocol - -`Codec.THRIFT.writeSpans(spans)` encodes spans in the following fashion: -``` -write_byte(12) // type of the list elements: 12 == struct -write_i32(count) // count of spans that will follow -for (int i = 0; i < count; i++) { - writeTBinaryProtocol(spans(i)) -} -``` - -### Legacy encoding -Older versions of zipkin accepted a single span per message, as opposed -to a list per message. This practice is deprecated, but still supported. diff --git a/zipkin-collector/kafka08/pom.xml b/zipkin-collector/kafka08/pom.xml deleted file mode 100644 index 10d80a0..0000000 --- a/zipkin-collector/kafka08/pom.xml +++ /dev/null @@ -1,59 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.zipkin.zipkin2</groupId> - <artifactId>zipkin-collector-parent</artifactId> - <version>2.13.1-SNAPSHOT</version> - </parent> - - <artifactId>zipkin-collector-kafka08</artifactId> - <name>Collector: Kafka (Legacy)</name> - - <properties> - <main.basedir>${project.basedir}/../..</main.basedir> - <!-- This is pinned to Kafka 0.8.x client as 0.9.x brokers work with them, but not visa-versa - http://docs.confluent.io/2.0.0/upgrade.html --> - <kafka.version>0.8.2.2</kafka.version> - </properties> - - <dependencies> - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>zipkin-collector</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.11</artifactId> - <version>${kafka.version}</version> - <exclusions> - <!-- don't eagerly bind slf4j --> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> -</project> diff --git a/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaCollector.java b/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaCollector.java deleted file mode 100644 index b68cc03..0000000 --- a/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaCollector.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package zipkin2.collector.kafka08; - -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ZookeeperConsumerConnector; -import zipkin2.CheckResult; -import zipkin2.collector.Collector; -import zipkin2.collector.CollectorComponent; -import zipkin2.collector.CollectorMetrics; -import zipkin2.collector.CollectorSampler; -import zipkin2.storage.SpanConsumer; -import zipkin2.storage.StorageComponent; - -import static kafka.consumer.Consumer.createJavaConsumerConnector; -import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; - -/** - * This collector polls a Kafka topic for messages that contain TBinaryProtocol big-endian encoded - * lists of spans. These spans are pushed to a {@link SpanConsumer#accept span consumer}. - * - * <p>This collector remains a Kafka 0.8.x consumer, while Zipkin systems update to 0.9+. - */ -public final class KafkaCollector extends CollectorComponent { - - public static Builder builder() { - return new Builder(); - } - - /** Configuration including defaults needed to consume spans from a Kafka topic. */ - public static final class Builder extends CollectorComponent.Builder { - final Properties properties = new Properties(); - Collector.Builder delegate = Collector.newBuilder(KafkaCollector.class); - CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS; - String topic = "zipkin"; - int streams = 1; - - @Override - public Builder storage(StorageComponent storage) { - delegate.storage(storage); - return this; - } - - @Override - public Builder sampler(CollectorSampler sampler) { - delegate.sampler(sampler); - return this; - } - - @Override - public Builder metrics(CollectorMetrics metrics) { - if (metrics == null) throw new NullPointerException("metrics == null"); - this.metrics = metrics.forTransport("kafka"); - delegate.metrics(this.metrics); - return this; - } - - /** Topic zipkin spans will be consumed from. Defaults to "zipkin" */ - public Builder topic(String topic) { - if (topic == null) throw new NullPointerException("topic == null"); - this.topic = topic; - return this; - } - - /** The zookeeper connect string, ex. 127.0.0.1:2181. No default */ - public Builder zookeeper(String zookeeper) { - if (zookeeper == null) throw new NullPointerException("zookeeper == null"); - properties.put("zookeeper.connect", zookeeper); - return this; - } - - /** The consumer group this process is consuming on behalf of. Defaults to "zipkin" */ - public Builder groupId(String groupId) { - if (groupId == null) throw new NullPointerException("groupId == null"); - properties.put(GROUP_ID_CONFIG, groupId); - return this; - } - - /** Count of threads/streams consuming the topic. Defaults to 1 */ - public Builder streams(int streams) { - this.streams = streams; - return this; - } - - /** Maximum size of a message containing spans in bytes. Defaults to 1 MiB */ - public Builder maxMessageSize(int bytes) { - properties.put("fetch.message.max.bytes", String.valueOf(bytes)); - return this; - } - - /** - * By default, a consumer will be built from properties derived from builder defaults, as well - * "auto.offset.reset" -> "smallest". Any properties set here will override the consumer config. - * - * <p>For example: Only consume spans since you connected by setting the below. - * - * <pre>{@code - * Map<String, String> overrides = new LinkedHashMap<>(); - * overrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest"); - * builder.overrides(overrides); - * }</pre> - * - * @see org.apache.kafka.clients.consumer.ConsumerConfig - */ - public final Builder overrides(Map<String, ?> overrides) { - if (overrides == null) throw new NullPointerException("overrides == null"); - properties.putAll(overrides); - return this; - } - - @Override - public KafkaCollector build() { - return new KafkaCollector(this); - } - - Builder() { - // Settings below correspond to "Old Consumer Configs" - // http://kafka.apache.org/documentation.html - properties.put(GROUP_ID_CONFIG, "zipkin"); - properties.put("fetch.message.max.bytes", String.valueOf(1024 * 1024)); - // Same default as zipkin-scala, and keeps tests from hanging - properties.put(AUTO_OFFSET_RESET_CONFIG, "smallest"); - } - } - - final LazyConnector connector; - final LazyStreams streams; - - KafkaCollector(Builder builder) { - connector = new LazyConnector(builder); - streams = new LazyStreams(builder, connector); - } - - @Override - public KafkaCollector start() { - connector.get(); - streams.get(); - return this; - } - - @Override - public CheckResult check() { - try { - connector.get(); // make sure the connector didn't throw - CheckResult failure = streams.failure.get(); // check the streams didn't quit - if (failure != null) return failure; - return CheckResult.OK; - } catch (RuntimeException e) { - return CheckResult.failed(e); - } - } - - static final class LazyConnector { - - final ConsumerConfig config; - volatile ZookeeperConsumerConnector connector; - - LazyConnector(Builder builder) { - this.config = new ConsumerConfig(builder.properties); - } - - ZookeeperConsumerConnector get() { - if (connector == null) { - synchronized (this) { - if (connector == null) { - connector = (ZookeeperConsumerConnector) createJavaConsumerConnector(config); - } - } - } - return connector; - } - - void close() { - ZookeeperConsumerConnector maybeConnector = connector; - if (maybeConnector == null) return; - maybeConnector.shutdown(); - } - } - - @Override - public void close() { - streams.close(); - connector.close(); - } - - static final class LazyStreams { - final int streams; - final String topic; - final Collector collector; - final CollectorMetrics metrics; - final LazyConnector connector; - final AtomicReference<CheckResult> failure = new AtomicReference<>(); - volatile ExecutorService pool; - - LazyStreams(Builder builder, LazyConnector connector) { - this.streams = builder.streams; - this.topic = builder.topic; - this.collector = builder.delegate.build(); - this.metrics = builder.metrics; - this.connector = connector; - } - - ExecutorService get() { - if (pool == null) { - synchronized (this) { - if (pool == null) { - pool = compute(); - } - } - } - return pool; - } - - void close() { - ExecutorService maybePool = pool; - if (maybePool == null) return; - maybePool.shutdownNow(); - try { - maybePool.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { - // at least we tried - } - } - - ExecutorService compute() { - ExecutorService pool = - streams == 1 - ? Executors.newSingleThreadExecutor() - : Executors.newFixedThreadPool(streams); - - Map<String, Integer> topicCountMap = new LinkedHashMap<>(1); - topicCountMap.put(topic, streams); - - for (KafkaStream<byte[], byte[]> stream : - connector.get().createMessageStreams(topicCountMap).get(topic)) { - pool.execute(guardFailures(new KafkaStreamProcessor(stream, collector, metrics))); - } - return pool; - } - - Runnable guardFailures(final Runnable delegate) { - return new Runnable() { - @Override - public void run() { - try { - delegate.run(); - } catch (RuntimeException e) { - failure.set(CheckResult.failed(e)); - } - } - }; - } - } -} diff --git a/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaStreamProcessor.java b/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaStreamProcessor.java deleted file mode 100644 index 4232c2e..0000000 --- a/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaStreamProcessor.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package zipkin2.collector.kafka08; - -import java.util.Collections; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; -import zipkin2.Callback; -import zipkin2.Span; -import zipkin2.codec.SpanBytesDecoder; -import zipkin2.collector.Collector; -import zipkin2.collector.CollectorMetrics; - -/** Consumes spans from Kafka messages, ignoring malformed input */ -final class KafkaStreamProcessor implements Runnable { - static final Callback<Void> NOOP = - new Callback<Void>() { - @Override - public void onSuccess(Void value) {} - - @Override - public void onError(Throwable t) {} - }; - - final KafkaStream<byte[], byte[]> stream; - final Collector collector; - final CollectorMetrics metrics; - - KafkaStreamProcessor( - KafkaStream<byte[], byte[]> stream, Collector collector, CollectorMetrics metrics) { - this.stream = stream; - this.collector = collector; - this.metrics = metrics; - } - - @Override - public void run() { - ConsumerIterator<byte[], byte[]> messages = stream.iterator(); - while (messages.hasNext()) { - byte[] bytes = messages.next().message(); - metrics.incrementMessages(); - metrics.incrementBytes(bytes.length); - if (bytes.length == 0) continue; // lenient on empty messages - - if (bytes.length < 2) { // need two bytes to check if protobuf - metrics.incrementMessagesDropped(); - continue; - } - - // If we received legacy single-span encoding, decode it into a singleton list - if (!protobuf3(bytes) && bytes[0] <= 16 && bytes[0] != 12 /* thrift, but not a list */) { - Span span; - try { - span = SpanBytesDecoder.THRIFT.decodeOne(bytes); - } catch (RuntimeException e) { - metrics.incrementMessagesDropped(); - continue; - } - collector.accept(Collections.singletonList(span), NOOP); - } else { - collector.acceptSpans(bytes, NOOP); - } - } - } - - /* span key or trace ID key */ - static boolean protobuf3(byte[] bytes) { - return bytes[0] == 10 && bytes[1] != 0; // varint follows and won't be zero - } -} diff --git a/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/ITKafkaCollector.java b/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/ITKafkaCollector.java deleted file mode 100644 index 290a71e..0000000 --- a/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/ITKafkaCollector.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package zipkin2.collector.kafka08; - -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import org.I0Itec.zkclient.exception.ZkTimeoutException; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.rules.Timeout; -import zipkin2.Call; -import zipkin2.Callback; -import zipkin2.CheckResult; -import zipkin2.Span; -import zipkin2.TestObjects; -import zipkin2.codec.SpanBytesEncoder; -import zipkin2.collector.InMemoryCollectorMetrics; -import zipkin2.collector.kafka08.KafkaCollector.Builder; -import zipkin2.storage.SpanConsumer; -import zipkin2.storage.SpanStore; -import zipkin2.storage.StorageComponent; - -import static org.assertj.core.api.Assertions.assertThat; -import static zipkin2.TestObjects.CLIENT_SPAN; -import static zipkin2.TestObjects.UTF_8; -import static zipkin2.codec.SpanBytesEncoder.THRIFT; - -public class ITKafkaCollector { - @Rule public ExpectedException thrown = ExpectedException.none(); - @ClassRule public static Timeout globalTimeout = Timeout.seconds(20); - - List<Span> spans = Arrays.asList(TestObjects.LOTS_OF_SPANS[0], TestObjects.LOTS_OF_SPANS[1]); - - Producer<String, byte[]> producer = KafkaTestGraph.INSTANCE.producer(); - InMemoryCollectorMetrics metrics = new InMemoryCollectorMetrics(); - InMemoryCollectorMetrics kafkaMetrics = metrics.forTransport("kafka"); - - LinkedBlockingQueue<List<Span>> recvdSpans = new LinkedBlockingQueue<>(); - SpanConsumer consumer = (spans) -> { - recvdSpans.add(spans); - return Call.create(null); - }; - - @Test - public void checkPasses() { - try (KafkaCollector collector = newKafkaTransport(builder("check_passes"), consumer)) { - assertThat(collector.check().ok()).isTrue(); - } - } - - @Test - public void start_failsOnInvalidZooKeeper() { - thrown.expect(ZkTimeoutException.class); - thrown.expectMessage("Unable to connect to zookeeper server within timeout: 6000"); - - Builder builder = builder("fail_invalid_zk").zookeeper("1.1.1.1"); - - try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {} - } - - @Test - public void canSetMaxMessageSize() { - Builder builder = builder("max_message").maxMessageSize(1); - - try (KafkaCollector collector = newKafkaTransport(builder, consumer)) { - assertThat(collector.connector.get().config().fetchMessageMaxBytes()).isEqualTo(1); - } - } - - /** Ensures legacy encoding works: a single TBinaryProtocol encoded span */ - @Test - public void messageWithSingleThriftSpan() throws Exception { - Builder builder = builder("single_span"); - - byte[] bytes = THRIFT.encode(CLIENT_SPAN); - producer.send(new KeyedMessage<>(builder.topic, bytes)); - - try (KafkaCollector collector = newKafkaTransport(builder, consumer)) { - assertThat(recvdSpans.take()).containsExactly(CLIENT_SPAN); - } - - assertThat(kafkaMetrics.messages()).isEqualTo(1); - assertThat(kafkaMetrics.messagesDropped()).isZero(); - assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length); - assertThat(kafkaMetrics.spans()).isEqualTo(1); - assertThat(kafkaMetrics.spansDropped()).isZero(); - } - - /** Ensures list encoding works: a TBinaryProtocol encoded list of spans */ - @Test - public void messageWithMultipleSpans_thrift() throws Exception { - messageWithMultipleSpans(builder("multiple_spans_thrift"), THRIFT); - } - - /** Ensures list encoding works: a json encoded list of spans */ - @Test - public void messageWithMultipleSpans_json() throws Exception { - messageWithMultipleSpans(builder("multiple_spans_json"), SpanBytesEncoder.JSON_V1); - } - - /** Ensures list encoding works: a version 2 json list of spans */ - @Test - public void messageWithMultipleSpans_json2() throws Exception { - messageWithMultipleSpans(builder("multiple_spans_json2"), SpanBytesEncoder.JSON_V2); - } - - /** Ensures list encoding works: proto3 ListOfSpans */ - @Test - public void messageWithMultipleSpans_proto3() throws Exception { - messageWithMultipleSpans(builder("multiple_spans_proto3"), SpanBytesEncoder.PROTO3); - } - - void messageWithMultipleSpans(Builder builder, SpanBytesEncoder encoder) throws Exception { - byte[] message = encoder.encodeList(spans); - - producer.send(new KeyedMessage<>(builder.topic, message)); - - try (KafkaCollector collector = newKafkaTransport(builder, consumer)) { - assertThat(recvdSpans.take()).containsAll(spans); - } - - assertThat(kafkaMetrics.messages()).isEqualTo(1); - assertThat(kafkaMetrics.messagesDropped()).isZero(); - assertThat(kafkaMetrics.bytes()).isEqualTo(message.length); - assertThat(kafkaMetrics.spans()).isEqualTo(spans.size()); - assertThat(kafkaMetrics.spansDropped()).isZero(); - } - - /** Ensures malformed spans don't hang the collector */ - @Test - public void skipsMalformedData() throws Exception { - Builder builder = builder("decoder_exception"); - - byte[] malformed1 = "[\"='".getBytes(UTF_8); // screwed up json - byte[] malformed2 = "malformed".getBytes(UTF_8); - producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans))); - producer.send(new KeyedMessage<>(builder.topic, new byte[0])); - producer.send(new KeyedMessage<>(builder.topic, malformed1)); - producer.send(new KeyedMessage<>(builder.topic, malformed2)); - producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans))); - - try (KafkaCollector collector = newKafkaTransport(builder, consumer)) { - assertThat(recvdSpans.take()).containsExactlyElementsOf(spans); - // the only way we could read this, is if the malformed spans were skipped. - assertThat(recvdSpans.take()).containsExactlyElementsOf(spans); - } - - assertThat(kafkaMetrics.messages()).isEqualTo(5); - assertThat(kafkaMetrics.messagesDropped()).isEqualTo(2); // only malformed, not empty - assertThat(kafkaMetrics.bytes()) - .isEqualTo(THRIFT.encodeList(spans).length * 2 + malformed1.length + malformed2.length); - assertThat(kafkaMetrics.spans()).isEqualTo(spans.size() * 2); - assertThat(kafkaMetrics.spansDropped()).isZero(); - } - - /** Guards against errors that leak from storage, such as InvalidQueryException */ - @Test - public void skipsOnStorageException() throws Exception { - Builder builder = builder("storage_exception"); - - AtomicInteger counter = new AtomicInteger(); - consumer = (input) -> new Call.Base<Void>() { - - @Override protected Void doExecute() { - throw new AssertionError(); - } - - @Override protected void doEnqueue(Callback<Void> callback) { - if (counter.getAndIncrement() == 1) { - callback.onError(new RuntimeException("storage fell over")); - } else { - recvdSpans.add(spans); - callback.onSuccess(null); - } - } - - @Override public Call<Void> clone() { - throw new AssertionError(); - } - }; - - producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans))); - producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans))); // tossed on error - producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans))); - - try (KafkaCollector collector = newKafkaTransport(builder, consumer)) { - assertThat(recvdSpans.take()).containsExactlyElementsOf(spans); - // the only way we could read this, is if the malformed span was skipped. - assertThat(recvdSpans.take()).containsExactlyElementsOf(spans); - } - - assertThat(kafkaMetrics.messages()).isEqualTo(3); - assertThat(kafkaMetrics.messagesDropped()).isZero(); // storage failure isn't a message failure - assertThat(kafkaMetrics.bytes()).isEqualTo(THRIFT.encodeList(spans).length * 3); - assertThat(kafkaMetrics.spans()).isEqualTo(spans.size() * 3); - assertThat(kafkaMetrics.spansDropped()).isEqualTo(spans.size()); // only one dropped - } - - Builder builder(String topic) { - return new Builder().metrics(metrics).zookeeper("127.0.0.1:2181").topic(topic); - } - - KafkaCollector newKafkaTransport(Builder builder, SpanConsumer consumer) { - return new KafkaCollector(builder.storage(buildStorage(consumer))).start(); - } - - StorageComponent buildStorage(final SpanConsumer spanConsumer) { - return new StorageComponent() { - @Override - public SpanStore spanStore() { - throw new AssertionError(); - } - - @Override - public SpanConsumer spanConsumer() { - return spanConsumer; - } - - @Override - public CheckResult check() { - return CheckResult.OK; - } - - @Override - public void close() { - throw new AssertionError(); - } - }; - } -} diff --git a/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/KafkaTestGraph.java b/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/KafkaTestGraph.java deleted file mode 100644 index 3ff9beb..0000000 --- a/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/KafkaTestGraph.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package zipkin2.collector.kafka08; - -import java.util.Properties; -import kafka.common.FailedToSendMessageException; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.exception.ZkTimeoutException; -import org.junit.AssumptionViolatedException; - -/** Tests only execute when ZK and Kafka are listening on 127.0.0.1 on default ports. */ -enum KafkaTestGraph { - INSTANCE; - - private AssumptionViolatedException ex; - private Producer<String, byte[]> producer; - - synchronized Producer<String, byte[]> producer() { - if (ex != null) throw ex; - if (this.producer == null) { - Properties producerProps = new Properties(); - producerProps.put("metadata.broker.list", "127.0.0.1:9092"); - producerProps.put("producer.type", "sync"); - producer = new Producer<>(new ProducerConfig(producerProps)); - try { - new ZkClient("127.0.0.1:2181", 1000); - producer.send(new KeyedMessage<>("test", new byte[0])); - } catch (FailedToSendMessageException | ZkTimeoutException e) { - throw ex = new AssumptionViolatedException(e.getMessage(), e); - } - } - return producer; - } -} diff --git a/zipkin-collector/kafka08/src/test/resources/log4j.properties b/zipkin-collector/kafka08/src/test/resources/log4j.properties deleted file mode 100644 index 15345f3..0000000 --- a/zipkin-collector/kafka08/src/test/resources/log4j.properties +++ /dev/null @@ -1,7 +0,0 @@ -# By default, everything goes to console and file -log4j.rootLogger=WARN, A1 -# A1 is set to be a ConsoleAppender. -log4j.appender.A1=org.apache.log4j.ConsoleAppender -log4j.appender.A1.layout=org.apache.log4j.PatternLayout -log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n -log4j.appender.A1.ImmediateFlush=true diff --git a/zipkin-collector/kafka08/src/test/resources/log4j2.properties b/zipkin-collector/kafka08/src/test/resources/log4j2.properties deleted file mode 100755 index c437666..0000000 --- a/zipkin-collector/kafka08/src/test/resources/log4j2.properties +++ /dev/null @@ -1,11 +0,0 @@ -appenders=console -appender.console.type=Console -appender.console.name=STDOUT -appender.console.layout.type=PatternLayout -appender.console.layout.pattern=%d{ABSOLUTE} %-5p [%t] %C{2} (%F:%L) - %m%n -rootLogger.level=warn -rootLogger.appenderRefs=stdout -rootLogger.appenderRef.stdout.ref=STDOUT -# don't waste logs when ZK check fails -logger.zk.name=org.apache.zookeeper.ClientCnxn -logger.zk.level=off diff --git a/zipkin-collector/pom.xml b/zipkin-collector/pom.xml index 153398d..246f176 100644 --- a/zipkin-collector/pom.xml +++ b/zipkin-collector/pom.xml @@ -42,7 +42,6 @@ <module>kafka</module> <module>rabbitmq</module> <module>scribe</module> - <module>kafka08</module> </modules> <dependencies> diff --git a/zipkin-server/README.md b/zipkin-server/README.md index 7832c84..7e64055 100644 --- a/zipkin-server/README.md +++ b/zipkin-server/README.md @@ -352,14 +352,6 @@ Specifying bootstrap servers as a system property, instead of an environment var $ java -Dzipkin.collector.kafka.bootstrap-servers=127.0.0.1:9092 -jar zipkin.jar ``` -#### Migration from Kafka < 0.8.1 - -As explained [on kafka wiki](https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka), offsets were stored in ZooKeeper. This has changed and offsets are now stored directly in Kafka. You need to update offsets in Kafka 0.10 by following the instructions. - -#### Kafka (Legacy) Collector -The default collector is for Kafka 0.10.x+ brokers. You can use Kafka -0.8 brokers via an external module. See [zipkin-autoconfigure/collector-kafka08](../zipkin-autoconfigure/collector-kafka08/). - ### RabbitMQ collector The [RabbitMQ collector](../zipkin-collector/rabbitmq) will be enabled when the `addresses` or `uri` for the RabbitMQ server(s) is set.
