http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml b/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml deleted file mode 100644 index 51f55b3..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml +++ /dev/null @@ -1,83 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-connectors</artifactId> - <version>1.2-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-connector-elasticsearch2_2.10</artifactId> - <name>flink-connector-elasticsearch2</name> - - <packaging>jar</packaging> - - <!-- Allow users to pass custom connector versions --> - <properties> - <elasticsearch.version>2.3.5</elasticsearch.version> - </properties> - - <dependencies> - - <!-- core dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.elasticsearch</groupId> - <artifactId>elasticsearch</artifactId> - <version>${elasticsearch.version}</version> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - </dependency> - - <!-- core dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - </dependencies> - -</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java deleted file mode 100644 index 650931f..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.elasticsearch2; - -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.bulk.BulkProcessor; - -public class BulkProcessorIndexer implements RequestIndexer { - private final BulkProcessor bulkProcessor; - - public BulkProcessorIndexer(BulkProcessor bulkProcessor) { - this.bulkProcessor = bulkProcessor; - } - - @Override - public void add(ActionRequest... actionRequests) { - for (ActionRequest actionRequest : actionRequests) { - this.bulkProcessor.add(actionRequest); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java deleted file mode 100644 index e839589..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.elasticsearch2; - -import com.google.common.collect.ImmutableList; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.util.Preconditions; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Sink that emits its input elements in bulk to an Elasticsearch cluster. - * - * <p> - * When using the second constructor - * {@link #ElasticsearchSink(java.util.Map, java.util.List, ElasticsearchSinkFunction)} a {@link TransportClient} will - * be used. - * - * <p> - * <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster - * can be connected to. - * - * <p> - * The {@link Map} passed to the constructor is forwarded to Elasticsearch when creating - * {@link TransportClient}. The config keys can be found in the Elasticsearch - * documentation. An important setting is {@code cluster.name}, this should be set to the name - * of the cluster that the sink should emit to. - * - * <p> - * Internally, the sink will use a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}. - * This will buffer elements before sending a request to the cluster. The behaviour of the - * {@code BulkProcessor} can be configured using these config keys: - * <ul> - * <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer - * <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer - * <li> {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two - * settings in milliseconds - * </ul> - * - * <p> - * You also have to provide an {@link RequestIndexer}. This is used to create an - * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See - * {@link RequestIndexer} for an example. - * - * @param <T> Type of the elements emitted by this sink - */ -public class ElasticsearchSink<T> extends RichSinkFunction<T> { - - public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; - public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; - public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class); - - /** - * The user specified config map that we forward to Elasticsearch when we create the Client. - */ - private final Map<String, String> userConfig; - - /** - * The list of nodes that the TransportClient should connect to. This is null if we are using - * an embedded Node to get a Client. - */ - private final List<InetSocketAddress> transportAddresses; - - /** - * The builder that is used to construct an {@link IndexRequest} from the incoming element. - */ - private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction; - - /** - * The Client that was either retrieved from a Node or is a TransportClient. - */ - private transient Client client; - - /** - * Bulk processor that was created using the client - */ - private transient BulkProcessor bulkProcessor; - - /** - * Bulk {@link org.elasticsearch.action.ActionRequest} indexer - */ - private transient RequestIndexer requestIndexer; - - /** - * This is set from inside the BulkProcessor listener if there where failures in processing. - */ - private final AtomicBoolean hasFailure = new AtomicBoolean(false); - - /** - * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing. - */ - private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>(); - - /** - * Creates a new ElasticsearchSink that connects to the cluster using a TransportClient. - * - * @param userConfig The map of user settings that are passed when constructing the TransportClient and BulkProcessor - * @param transportAddresses The Elasticsearch Nodes to which to connect using a {@code TransportClient} - * @param elasticsearchSinkFunction This is used to generate the ActionRequest from the incoming element - * - */ - public ElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { - this.userConfig = userConfig; - this.elasticsearchSinkFunction = elasticsearchSinkFunction; - Preconditions.checkArgument(transportAddresses != null && transportAddresses.size() > 0); - this.transportAddresses = transportAddresses; - } - - /** - * Initializes the connection to Elasticsearch by creating a - * {@link org.elasticsearch.client.transport.TransportClient}. - */ - @Override - public void open(Configuration configuration) { - List<TransportAddress> transportNodes; - transportNodes = new ArrayList<>(transportAddresses.size()); - for (InetSocketAddress address : transportAddresses) { - transportNodes.add(new InetSocketTransportAddress(address)); - } - - Settings settings = Settings.settingsBuilder().put(userConfig).build(); - - TransportClient transportClient = TransportClient.builder().settings(settings).build(); - for (TransportAddress transport: transportNodes) { - transportClient.addTransportAddress(transport); - } - - // verify that we actually are connected to a cluster - ImmutableList<DiscoveryNode> nodes = ImmutableList.copyOf(transportClient.connectedNodes()); - if (nodes.isEmpty()) { - throw new RuntimeException("Client is not connected to any Elasticsearch nodes!"); - } - - client = transportClient; - - if (LOG.isInfoEnabled()) { - LOG.info("Created Elasticsearch TransportClient {}", client); - } - - BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(client, new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - if (response.hasFailures()) { - for (BulkItemResponse itemResp : response.getItems()) { - if (itemResp.isFailed()) { - LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); - failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); - } - } - hasFailure.set(true); - } - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - LOG.error(failure.getMessage()); - failureThrowable.compareAndSet(null, failure); - hasFailure.set(true); - } - }); - - // This makes flush() blocking - bulkProcessorBuilder.setConcurrentRequests(0); - - ParameterTool params = ParameterTool.fromMap(userConfig); - - if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) { - bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)); - } - - if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) { - bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt( - CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB)); - } - - if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) { - bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS))); - } - - bulkProcessor = bulkProcessorBuilder.build(); - requestIndexer = new BulkProcessorIndexer(bulkProcessor); - } - - @Override - public void invoke(T element) { - elasticsearchSinkFunction.process(element, getRuntimeContext(), requestIndexer); - } - - @Override - public void close() { - if (bulkProcessor != null) { - bulkProcessor.close(); - bulkProcessor = null; - } - - if (client != null) { - client.close(); - } - - if (hasFailure.get()) { - Throwable cause = failureThrowable.get(); - if (cause != null) { - throw new RuntimeException("An error occured in ElasticsearchSink.", cause); - } else { - throw new RuntimeException("An error occured in ElasticsearchSink."); - } - } - - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java deleted file mode 100644 index 55ba720..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.elasticsearch2; - -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.functions.RuntimeContext; - -import java.io.Serializable; - -/** - * Method that creates an {@link org.elasticsearch.action.ActionRequest} from an element in a Stream. - * - * <p> - * This is used by {@link ElasticsearchSink} to prepare elements for sending them to Elasticsearch. - * - * <p> - * Example: - * - * <pre>{@code - * private static class TestElasticSearchSinkFunction implements - * ElasticsearchSinkFunction<Tuple2<Integer, String>> { - * - * public IndexRequest createIndexRequest(Tuple2<Integer, String> element) { - * Map<String, Object> json = new HashMap<>(); - * json.put("data", element.f1); - * - * return Requests.indexRequest() - * .index("my-index") - * .type("my-type") - * .id(element.f0.toString()) - * .source(json); - * } - * - * public void process(Tuple2<Integer, String> element, RuntimeContext ctx, RequestIndexer indexer) { - * indexer.add(createIndexRequest(element)); - * } - * } - * - * }</pre> - * - * @param <T> The type of the element handled by this {@code ElasticsearchSinkFunction} - */ -public interface ElasticsearchSinkFunction<T> extends Serializable, Function { - void process(T element, RuntimeContext ctx, RequestIndexer indexer); -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java deleted file mode 100644 index 144a87b..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.elasticsearch2; - -import org.elasticsearch.action.ActionRequest; - -import java.io.Serializable; - -public interface RequestIndexer extends Serializable { - void add(ActionRequest... actionRequests); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java deleted file mode 100644 index bc9bedc..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java +++ /dev/null @@ -1,233 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.elasticsearch2; - -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.Requests; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.node.Node; -import org.elasticsearch.node.NodeBuilder; -import org.junit.Assert; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.File; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase { - - private static final int NUM_ELEMENTS = 20; - - @ClassRule - public static TemporaryFolder tempFolder = new TemporaryFolder(); - - @Test - public void testTransportClient() throws Exception { - - File dataDir = tempFolder.newFolder(); - - Node node = NodeBuilder.nodeBuilder() - .settings(Settings.settingsBuilder() - .put("path.home", dataDir.getParent()) - .put("http.enabled", false) - .put("path.data", dataDir.getAbsolutePath())) - // set a custom cluster name to verify that user config works correctly - .clusterName("my-transport-client-cluster") - .node(); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); - - Map<String, String> config = new HashMap<>(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - config.put("cluster.name", "my-transport-client-cluster"); - - // Can't use {@link TransportAddress} as its not Serializable in Elasticsearch 2.x - List<InetSocketAddress> transports = new ArrayList<>(); - transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); - - source.addSink(new ElasticsearchSink<>(config, transports, new TestElasticsearchSinkFunction())); - - env.execute("Elasticsearch TransportClient Test"); - - // verify the results - Client client = node.client(); - for (int i = 0; i < NUM_ELEMENTS; i++) { - GetResponse response = client.get(new GetRequest("my-index", - "my-type", Integer.toString(i))).actionGet(); - Assert.assertEquals("message #" + i, response.getSource().get("data")); - } - - node.close(); - } - - @Test(expected = IllegalArgumentException.class) - public void testNullTransportClient() throws Exception { - - File dataDir = tempFolder.newFolder(); - - Node node = NodeBuilder.nodeBuilder() - .settings(Settings.settingsBuilder() - .put("path.home", dataDir.getParent()) - .put("http.enabled", false) - .put("path.data", dataDir.getAbsolutePath())) - // set a custom cluster name to verify that user config works correctly - .clusterName("my-transport-client-cluster") - .node(); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); - - Map<String, String> config = new HashMap<>(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - config.put("cluster.name", "my-transport-client-cluster"); - - source.addSink(new ElasticsearchSink<>(config, null, new TestElasticsearchSinkFunction())); - - env.execute("Elasticsearch TransportClient Test"); - - // verify the results - Client client = node.client(); - for (int i = 0; i < NUM_ELEMENTS; i++) { - GetResponse response = client.get(new GetRequest("my-index", - "my-type", Integer.toString(i))).actionGet(); - Assert.assertEquals("message #" + i, response.getSource().get("data")); - } - - node.close(); - } - - @Test(expected = IllegalArgumentException.class) - public void testEmptyTransportClient() throws Exception { - - File dataDir = tempFolder.newFolder(); - - Node node = NodeBuilder.nodeBuilder() - .settings(Settings.settingsBuilder() - .put("path.home", dataDir.getParent()) - .put("http.enabled", false) - .put("path.data", dataDir.getAbsolutePath())) - // set a custom cluster name to verify that user config works correctly - .clusterName("my-transport-client-cluster") - .node(); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); - - Map<String, String> config = new HashMap<>(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - config.put("cluster.name", "my-transport-client-cluster"); - - source.addSink(new ElasticsearchSink<>(config, new ArrayList<InetSocketAddress>(), new TestElasticsearchSinkFunction())); - - env.execute("Elasticsearch TransportClient Test"); - - // verify the results - Client client = node.client(); - for (int i = 0; i < NUM_ELEMENTS; i++) { - GetResponse response = client.get(new GetRequest("my-index", - "my-type", Integer.toString(i))).actionGet(); - Assert.assertEquals("message #" + i, response.getSource().get("data")); - } - - node.close(); - } - - @Test(expected = JobExecutionException.class) - public void testTransportClientFails() throws Exception{ - // this checks whether the TransportClient fails early when there is no cluster to - // connect to. There isn't a similar test for the Node Client version since that - // one will block and wait for a cluster to come online - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); - - Map<String, String> config = new HashMap<>(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - config.put("cluster.name", "my-node-client-cluster"); - - List<InetSocketAddress> transports = new ArrayList<>(); - transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); - - source.addSink(new ElasticsearchSink<>(config, transports, new TestElasticsearchSinkFunction())); - - env.execute("Elasticsearch Node Client Test"); - } - - private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> { - private static final long serialVersionUID = 1L; - - private volatile boolean running = true; - - @Override - public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception { - for (int i = 0; i < NUM_ELEMENTS && running; i++) { - ctx.collect(Tuple2.of(i, "message #" + i)); - } - } - - @Override - public void cancel() { - running = false; - } - } - - private static class TestElasticsearchSinkFunction implements ElasticsearchSinkFunction<Tuple2<Integer, String>> { - private static final long serialVersionUID = 1L; - - public IndexRequest createIndexRequest(Tuple2<Integer, String> element) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element.f1); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .id(element.f0.toString()) - .source(json); - } - - @Override - public void process(Tuple2<Integer, String> element, RuntimeContext ctx, RequestIndexer indexer) { - indexer.add(createIndexRequest(element)); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java deleted file mode 100644 index 05760e8..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.elasticsearch2.examples; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink; -import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSinkFunction; -import org.apache.flink.streaming.connectors.elasticsearch2.RequestIndexer; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Requests; - -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that - * you have a cluster named "elasticsearch" running or change the name of cluster in the config map. - */ -public class ElasticsearchExample { - - public static void main(String[] args) throws Exception { - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - SingleOutputStreamOperator<String> source = - env.generateSequence(0, 20).map(new MapFunction<Long, String>() { - /** - * The mapping method. Takes an element from the input data set and transforms - * it into exactly one element. - * - * @param value The input value. - * @return The transformed value - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ - @Override - public String map(Long value) throws Exception { - return "message #" + value; - } - }); - - Map<String, String> config = new HashMap<>(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - - List<InetSocketAddress> transports = new ArrayList<>(); - transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); - - source.addSink(new ElasticsearchSink<>(config, transports, new ElasticsearchSinkFunction<String>(){ - @Override - public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { - indexer.add(createIndexRequest(element)); - } - })); - - env.execute("Elasticsearch Example"); - } - - private static IndexRequest createIndexRequest(String element) { - Map<String, Object> json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .id(element) - .source(json); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties deleted file mode 100644 index dc20726..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,27 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=OFF, testlogger - -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml deleted file mode 100644 index 7a077c2..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml +++ /dev/null @@ -1,30 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<configuration> - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> - </encoder> - </appender> - - <root level="WARN"> - <appender-ref ref="STDOUT"/> - </root> - <logger name="org.apache.flink.streaming.connectors.elasticsearch2" level="WARN"/> -</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/pom.xml b/flink-streaming-connectors/flink-connector-filesystem/pom.xml deleted file mode 100644 index 20c48c6..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/pom.xml +++ /dev/null @@ -1,163 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-connectors</artifactId> - <version>1.2-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-connector-filesystem_2.10</artifactId> - <name>flink-connector-filesystem</name> - - <packaging>jar</packaging> - - <!-- - This is a Hadoop2 only flink module. - --> - - <dependencies> - - <!-- core dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-shaded-hadoop2</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <!-- test dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils-junit</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-hadoop-compatibility_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-tests_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <scope>test</scope> - <type>test-jar</type> - <version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$--> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>test</scope> - <type>test-jar</type> - <version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$--> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-minikdc</artifactId> - <version>${minikdc.version}</version> - <scope>test</scope> - </dependency> - - </dependencies> - - <build> - <plugins> - - <!-- - https://issues.apache.org/jira/browse/DIRSHARED-134 - Required to pull the Mini-KDC transitive dependency - --> - <plugin> - <groupId>org.apache.felix</groupId> - <artifactId>maven-bundle-plugin</artifactId> - <version>3.0.1</version> - <inherited>true</inherited> - <extensions>true</extensions> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <!-- - Enforce single threaded execution to avoid port conflicts when running - secure mini DFS cluster - --> - <forkCount>1</forkCount> - <reuseForks>false</reuseForks> - </configuration> - </plugin> - </plugins> - </build> - -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java deleted file mode 100644 index 3e3c86b..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java +++ /dev/null @@ -1,309 +0,0 @@ -package org.apache.flink.streaming.connectors.fs; - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.IOException; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.Map; - -import org.apache.avro.Schema; -import org.apache.avro.file.CodecFactory; -import org.apache.avro.file.DataFileConstants; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumWriter; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.InputTypeConfigurable; -import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; - -/** -* Implementation of AvroKeyValue writer that can be used in Sink. -* Each entry would be wrapped in GenericRecord with key/value fields(same as in m/r lib) -<pre> -Usage: -{@code - BucketingSink<Tuple2<Long, Long>> sink = new BucketingSink<Tuple2<Long, Long>>("/tmp/path"); - sink.setBucketer(new DateTimeBucketer<Tuple2<Long, Long>>("yyyy-MM-dd/HH/mm/")); - sink.setPendingSuffix(".avro"); - Map<String,String> properties = new HashMap<>(); - Schema longSchema = Schema.create(Type.LONG); - String keySchema = longSchema.toString(); - String valueSchema = longSchema.toString(); - properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema); - properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema); - properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, Boolean.toString(true)); - properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC); - - sink.setWriter(new AvroSinkWriter<Long, Long>(properties)); - sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB, -} -</pre> -*/ -public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>> implements Writer<Tuple2<K, V>>, InputTypeConfigurable { - private static final long serialVersionUID = 1L; - public static final String CONF_OUTPUT_KEY_SCHEMA = "avro.schema.output.key"; - public static final String CONF_OUTPUT_VALUE_SCHEMA = "avro.schema.output.value"; - public static final String CONF_COMPRESS = FileOutputFormat.COMPRESS; - public static final String CONF_COMPRESS_CODEC = FileOutputFormat.COMPRESS_CODEC; - public static final String CONF_DEFLATE_LEVEL = "avro.deflate.level"; - public static final String CONF_XZ_LEVEL = "avro.xz.level"; - - private transient AvroKeyValueWriter<K, V> keyValueWriter; - - private final Map<String, String> properties; - - /** - * C'tor for the writer - * <p> - * You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above) - * @param properties - */ - @SuppressWarnings("deprecation") - public AvroKeyValueSinkWriter(Map<String, String> properties) { - this.properties = properties; - - String keySchemaString = properties.get(CONF_OUTPUT_KEY_SCHEMA); - if (keySchemaString == null) { - throw new IllegalStateException("No key schema provided, set '" + CONF_OUTPUT_KEY_SCHEMA + "' property"); - } - Schema.parse(keySchemaString);//verifying that schema valid - - String valueSchemaString = properties.get(CONF_OUTPUT_VALUE_SCHEMA); - if (valueSchemaString == null) { - throw new IllegalStateException("No value schema provided, set '" + CONF_OUTPUT_VALUE_SCHEMA + "' property"); - } - Schema.parse(valueSchemaString);//verifying that schema valid - } - - private boolean getBoolean(Map<String,String> conf, String key, boolean def) { - String value = conf.get(key); - if (value == null) { - return def; - } - return Boolean.parseBoolean(value); - } - - private int getInt(Map<String,String> conf, String key, int def) { - String value = conf.get(key); - if (value == null) { - return def; - } - return Integer.parseInt(value); - } - - //this derived from AvroOutputFormatBase.getCompressionCodec(..) - private CodecFactory getCompressionCodec(Map<String,String> conf) { - if (getBoolean(conf, CONF_COMPRESS, false)) { - int deflateLevel = getInt(conf, CONF_DEFLATE_LEVEL, CodecFactory.DEFAULT_DEFLATE_LEVEL); - int xzLevel = getInt(conf, CONF_XZ_LEVEL, CodecFactory.DEFAULT_XZ_LEVEL); - - String outputCodec = conf.get(CONF_COMPRESS_CODEC); - - if (DataFileConstants.DEFLATE_CODEC.equals(outputCodec)) { - return CodecFactory.deflateCodec(deflateLevel); - } else if (DataFileConstants.XZ_CODEC.equals(outputCodec)) { - return CodecFactory.xzCodec(xzLevel); - } else { - return CodecFactory.fromString(outputCodec); - } - } - return CodecFactory.nullCodec(); - } - - @Override - @SuppressWarnings("deprecation") - public void open(FileSystem fs, Path path) throws IOException { - super.open(fs, path); - - CodecFactory compressionCodec = getCompressionCodec(properties); - Schema keySchema = Schema.parse(properties.get(CONF_OUTPUT_KEY_SCHEMA)); - Schema valueSchema = Schema.parse(properties.get(CONF_OUTPUT_VALUE_SCHEMA)); - keyValueWriter = new AvroKeyValueWriter<K, V>(keySchema, valueSchema, compressionCodec, getStream()); - } - - @Override - public void close() throws IOException { - super.close();//the order is important since super.close flushes inside - if (keyValueWriter != null) { - keyValueWriter.close(); - } - } - - @Override - public long flush() throws IOException { - if (keyValueWriter != null) { - keyValueWriter.sync(); - } - return super.flush(); - } - - @Override - public void write(Tuple2<K, V> element) throws IOException { - getStream(); // Throws if the stream is not open - keyValueWriter.write(element.f0, element.f1); - } - - @Override - public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { - if (!type.isTupleType()) { - throw new IllegalArgumentException("Input TypeInformation is not a tuple type."); - } - - TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>) type; - - if (tupleType.getArity() != 2) { - throw new IllegalArgumentException("Input TypeInformation must be a Tuple2 type."); - } - } - - @Override - public Writer<Tuple2<K, V>> duplicate() { - return new AvroKeyValueSinkWriter<K, V>(properties); - } - - // taken from m/r avro lib to remove dependency on it - private static final class AvroKeyValueWriter<K, V> { - /** A writer for the Avro container file. */ - private final DataFileWriter<GenericRecord> mAvroFileWriter; - - /** - * The writer schema for the generic record entries of the Avro - * container file. - */ - private final Schema mKeyValuePairSchema; - - /** - * A reusable Avro generic record for writing key/value pairs to the - * file. - */ - private final AvroKeyValue<Object, Object> mOutputRecord; - - AvroKeyValueWriter(Schema keySchema, Schema valueSchema, - CodecFactory compressionCodec, OutputStream outputStream, - int syncInterval) throws IOException { - // Create the generic record schema for the key/value pair. - mKeyValuePairSchema = AvroKeyValue - .getSchema(keySchema, valueSchema); - - // Create an Avro container file and a writer to it. - DatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>( - mKeyValuePairSchema); - mAvroFileWriter = new DataFileWriter<GenericRecord>( - genericDatumWriter); - mAvroFileWriter.setCodec(compressionCodec); - mAvroFileWriter.setSyncInterval(syncInterval); - mAvroFileWriter.create(mKeyValuePairSchema, outputStream); - - // Create a reusable output record. - mOutputRecord = new AvroKeyValue<Object, Object>( - new GenericData.Record(mKeyValuePairSchema)); - } - - AvroKeyValueWriter(Schema keySchema, Schema valueSchema, - CodecFactory compressionCodec, OutputStream outputStream) - throws IOException { - this(keySchema, valueSchema, compressionCodec, outputStream, - DataFileConstants.DEFAULT_SYNC_INTERVAL); - } - - void write(K key, V value) throws IOException { - mOutputRecord.setKey(key); - mOutputRecord.setValue(value); - mAvroFileWriter.append(mOutputRecord.get()); - } - - void close() throws IOException { - mAvroFileWriter.close(); - } - - long sync() throws IOException { - return mAvroFileWriter.sync(); - } - } - - // taken from AvroKeyValue avro-mapr lib - public static class AvroKeyValue<K, V> { - /** The name of the key value pair generic record. */ - public static final String KEY_VALUE_PAIR_RECORD_NAME = "KeyValuePair"; - - /** The namespace of the key value pair generic record. */ - public static final String KEY_VALUE_PAIR_RECORD_NAMESPACE = "org.apache.avro.mapreduce"; - - /** The name of the generic record field containing the key. */ - public static final String KEY_FIELD = "key"; - - /** The name of the generic record field containing the value. */ - public static final String VALUE_FIELD = "value"; - - /** The key/value generic record wrapped by this class. */ - public final GenericRecord mKeyValueRecord; - - /** - * Wraps a GenericRecord that is a key value pair. - */ - public AvroKeyValue(GenericRecord keyValueRecord) { - mKeyValueRecord = keyValueRecord; - } - - public GenericRecord get() { - return mKeyValueRecord; - } - - public void setKey(K key) { - mKeyValueRecord.put(KEY_FIELD, key); - } - - public void setValue(V value) { - mKeyValueRecord.put(VALUE_FIELD, value); - } - - @SuppressWarnings("unchecked") - public K getKey() { - return (K) mKeyValueRecord.get(KEY_FIELD); - } - - @SuppressWarnings("unchecked") - public V getValue() { - return (V) mKeyValueRecord.get(VALUE_FIELD); - } - - /** - * Creates a KeyValuePair generic record schema. - * - * @return A schema for a generic record with two fields: 'key' and - * 'value'. - */ - public static Schema getSchema(Schema keySchema, Schema valueSchema) { - Schema schema = Schema.createRecord(KEY_VALUE_PAIR_RECORD_NAME, - "A key/value pair", KEY_VALUE_PAIR_RECORD_NAMESPACE, false); - schema.setFields(Arrays.asList(new Schema.Field(KEY_FIELD, - keySchema, "The key", null), new Schema.Field(VALUE_FIELD, - valueSchema, "The value", null))); - return schema; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java deleted file mode 100644 index 24ad6ab..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.fs; - -import org.apache.hadoop.fs.Path; - -import java.io.Serializable; - -/** - * A bucketer is used with a {@link RollingSink} - * to put emitted elements into rolling files. - * - * <p> - * The {@code RollingSink} has one active bucket that it is writing to at a time. Whenever - * a new element arrives it will ask the {@code Bucketer} if a new bucket should be started and - * the old one closed. The {@code Bucketer} can, for example, decide to start new buckets - * based on system time. - * - * @deprecated use {@link org.apache.flink.streaming.connectors.fs.bucketing.Bucketer} instead. - */ -@Deprecated -public interface Bucketer extends Serializable { - - /** - * Returns {@code true} when a new bucket should be started. - * - * @param currentBucketPath The bucket {@code Path} that is currently being used. - */ - boolean shouldStartNewBucket(Path basePath, Path currentBucketPath); - - /** - * Returns the {@link Path} of a new bucket file. - * - * @param basePath The base path containing all the buckets. - * - * @return The complete new {@code Path} of the new bucket. This should include the {@code basePath} - * and also the {@code subtaskIndex} tp avoid clashes with parallel sinks. - */ - Path getNextBucketPath(Path basePath); -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java deleted file mode 100644 index 174707c..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.fs; - - -/** - * A clock that can provide the current time. - * - * <p> - * Normally this would be system time, but for testing a custom {@code Clock} can be provided. - */ -public interface Clock { - - /** - * Return the current system time in milliseconds. - */ - public long currentTimeMillis(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java deleted file mode 100644 index 0df8998..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.fs; - -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.text.SimpleDateFormat; -import java.util.Date; - -/** - * A {@link Bucketer} that assigns to buckets based on current system time. - * - * <p> - * The {@code DateTimeBucketer} will create directories of the following form: - * {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path - * that was specified as a base path when creating the - * {@link RollingSink}. The {@code dateTimePath} - * is determined based on the current system time and the user provided format string. - * - * <p> - * {@link SimpleDateFormat} is used to derive a date string from the current system time and - * the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling - * files will have a granularity of hours. - * - * - * <p> - * Example: - * - * <pre>{@code - * Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH"); - * }</pre> - * - * This will create for example the following bucket path: - * {@code /base/1976-12-31-14/} - * - * @deprecated use {@link org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer} instead. - */ -@Deprecated -public class DateTimeBucketer implements Bucketer { - - private static Logger LOG = LoggerFactory.getLogger(DateTimeBucketer.class); - - private static final long serialVersionUID = 1L; - - private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH"; - - // We have this so that we can manually set it for tests. - private static Clock clock = new SystemClock(); - - private final String formatString; - - private transient SimpleDateFormat dateFormatter; - - /** - * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"}. - */ - public DateTimeBucketer() { - this(DEFAULT_FORMAT_STRING); - } - - /** - * Creates a new {@code DateTimeBucketer} with the given date/time format string. - * - * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine - * the bucket path. - */ - public DateTimeBucketer(String formatString) { - this.formatString = formatString; - - this.dateFormatter = new SimpleDateFormat(formatString); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - - this.dateFormatter = new SimpleDateFormat(formatString); - } - - - @Override - public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) { - String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis())); - return !(new Path(basePath, newDateTimeString).equals(currentBucketPath)); - } - - @Override - public Path getNextBucketPath(Path basePath) { - String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis())); - return new Path(basePath + "/" + newDateTimeString); - } - - @Override - public String toString() { - return "DateTimeBucketer{" + - "formatString='" + formatString + '\'' + - '}'; - } - - /** - * This sets the internal {@link Clock} implementation. This method should only be used for testing - * - * @param newClock The new clock to set. - */ - public static void setClock(Clock newClock) { - clock = newClock; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java deleted file mode 100644 index 6854596..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.fs; - -import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer; -import org.apache.hadoop.fs.Path; - -/** - * A {@link Bucketer} that does not perform any - * rolling of files. All files are written to the base path. - * - * @deprecated use {@link BasePathBucketer} instead. - */ -@Deprecated -public class NonRollingBucketer implements Bucketer { - private static final long serialVersionUID = 1L; - - @Override - public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) { - return false; - } - - @Override - public Path getNextBucketPath(Path basePath) { - return basePath; - } - - @Override - public String toString() { - return "NonRollingBucketer"; - } -}
