http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch2/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/pom.xml b/flink-connectors/flink-connector-elasticsearch2/pom.xml new file mode 100644 index 0000000..5fcb05e --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch2/pom.xml @@ -0,0 +1,83 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.2-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-elasticsearch2_2.10</artifactId> + <name>flink-connector-elasticsearch2</name> + + <packaging>jar</packaging> + + <!-- Allow users to pass custom connector versions --> + <properties> + <elasticsearch.version>2.3.5</elasticsearch.version> + </properties> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + <version>${elasticsearch.version}</version> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + </dependencies> + +</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java new file mode 100644 index 0000000..650931f --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch2; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkProcessor; + +public class BulkProcessorIndexer implements RequestIndexer { + private final BulkProcessor bulkProcessor; + + public BulkProcessorIndexer(BulkProcessor bulkProcessor) { + this.bulkProcessor = bulkProcessor; + } + + @Override + public void add(ActionRequest... actionRequests) { + for (ActionRequest actionRequest : actionRequests) { + this.bulkProcessor.add(actionRequest); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java new file mode 100644 index 0000000..e839589 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch2; + +import com.google.common.collect.ImmutableList; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.Preconditions; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Sink that emits its input elements in bulk to an Elasticsearch cluster. + * + * <p> + * When using the second constructor + * {@link #ElasticsearchSink(java.util.Map, java.util.List, ElasticsearchSinkFunction)} a {@link TransportClient} will + * be used. + * + * <p> + * <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster + * can be connected to. + * + * <p> + * The {@link Map} passed to the constructor is forwarded to Elasticsearch when creating + * {@link TransportClient}. The config keys can be found in the Elasticsearch + * documentation. An important setting is {@code cluster.name}, this should be set to the name + * of the cluster that the sink should emit to. + * + * <p> + * Internally, the sink will use a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}. + * This will buffer elements before sending a request to the cluster. The behaviour of the + * {@code BulkProcessor} can be configured using these config keys: + * <ul> + * <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer + * <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer + * <li> {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two + * settings in milliseconds + * </ul> + * + * <p> + * You also have to provide an {@link RequestIndexer}. This is used to create an + * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See + * {@link RequestIndexer} for an example. + * + * @param <T> Type of the elements emitted by this sink + */ +public class ElasticsearchSink<T> extends RichSinkFunction<T> { + + public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; + public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; + public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class); + + /** + * The user specified config map that we forward to Elasticsearch when we create the Client. + */ + private final Map<String, String> userConfig; + + /** + * The list of nodes that the TransportClient should connect to. This is null if we are using + * an embedded Node to get a Client. + */ + private final List<InetSocketAddress> transportAddresses; + + /** + * The builder that is used to construct an {@link IndexRequest} from the incoming element. + */ + private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction; + + /** + * The Client that was either retrieved from a Node or is a TransportClient. + */ + private transient Client client; + + /** + * Bulk processor that was created using the client + */ + private transient BulkProcessor bulkProcessor; + + /** + * Bulk {@link org.elasticsearch.action.ActionRequest} indexer + */ + private transient RequestIndexer requestIndexer; + + /** + * This is set from inside the BulkProcessor listener if there where failures in processing. + */ + private final AtomicBoolean hasFailure = new AtomicBoolean(false); + + /** + * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing. + */ + private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>(); + + /** + * Creates a new ElasticsearchSink that connects to the cluster using a TransportClient. + * + * @param userConfig The map of user settings that are passed when constructing the TransportClient and BulkProcessor + * @param transportAddresses The Elasticsearch Nodes to which to connect using a {@code TransportClient} + * @param elasticsearchSinkFunction This is used to generate the ActionRequest from the incoming element + * + */ + public ElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { + this.userConfig = userConfig; + this.elasticsearchSinkFunction = elasticsearchSinkFunction; + Preconditions.checkArgument(transportAddresses != null && transportAddresses.size() > 0); + this.transportAddresses = transportAddresses; + } + + /** + * Initializes the connection to Elasticsearch by creating a + * {@link org.elasticsearch.client.transport.TransportClient}. + */ + @Override + public void open(Configuration configuration) { + List<TransportAddress> transportNodes; + transportNodes = new ArrayList<>(transportAddresses.size()); + for (InetSocketAddress address : transportAddresses) { + transportNodes.add(new InetSocketTransportAddress(address)); + } + + Settings settings = Settings.settingsBuilder().put(userConfig).build(); + + TransportClient transportClient = TransportClient.builder().settings(settings).build(); + for (TransportAddress transport: transportNodes) { + transportClient.addTransportAddress(transport); + } + + // verify that we actually are connected to a cluster + ImmutableList<DiscoveryNode> nodes = ImmutableList.copyOf(transportClient.connectedNodes()); + if (nodes.isEmpty()) { + throw new RuntimeException("Client is not connected to any Elasticsearch nodes!"); + } + + client = transportClient; + + if (LOG.isInfoEnabled()) { + LOG.info("Created Elasticsearch TransportClient {}", client); + } + + BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(client, new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + if (response.hasFailures()) { + for (BulkItemResponse itemResp : response.getItems()) { + if (itemResp.isFailed()) { + LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); + failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + } + } + hasFailure.set(true); + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + LOG.error(failure.getMessage()); + failureThrowable.compareAndSet(null, failure); + hasFailure.set(true); + } + }); + + // This makes flush() blocking + bulkProcessorBuilder.setConcurrentRequests(0); + + ParameterTool params = ParameterTool.fromMap(userConfig); + + if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) { + bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)); + } + + if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) { + bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt( + CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB)); + } + + if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) { + bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS))); + } + + bulkProcessor = bulkProcessorBuilder.build(); + requestIndexer = new BulkProcessorIndexer(bulkProcessor); + } + + @Override + public void invoke(T element) { + elasticsearchSinkFunction.process(element, getRuntimeContext(), requestIndexer); + } + + @Override + public void close() { + if (bulkProcessor != null) { + bulkProcessor.close(); + bulkProcessor = null; + } + + if (client != null) { + client.close(); + } + + if (hasFailure.get()) { + Throwable cause = failureThrowable.get(); + if (cause != null) { + throw new RuntimeException("An error occured in ElasticsearchSink.", cause); + } else { + throw new RuntimeException("An error occured in ElasticsearchSink."); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java new file mode 100644 index 0000000..55ba720 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch2; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.RuntimeContext; + +import java.io.Serializable; + +/** + * Method that creates an {@link org.elasticsearch.action.ActionRequest} from an element in a Stream. + * + * <p> + * This is used by {@link ElasticsearchSink} to prepare elements for sending them to Elasticsearch. + * + * <p> + * Example: + * + * <pre>{@code + * private static class TestElasticSearchSinkFunction implements + * ElasticsearchSinkFunction<Tuple2<Integer, String>> { + * + * public IndexRequest createIndexRequest(Tuple2<Integer, String> element) { + * Map<String, Object> json = new HashMap<>(); + * json.put("data", element.f1); + * + * return Requests.indexRequest() + * .index("my-index") + * .type("my-type") + * .id(element.f0.toString()) + * .source(json); + * } + * + * public void process(Tuple2<Integer, String> element, RuntimeContext ctx, RequestIndexer indexer) { + * indexer.add(createIndexRequest(element)); + * } + * } + * + * }</pre> + * + * @param <T> The type of the element handled by this {@code ElasticsearchSinkFunction} + */ +public interface ElasticsearchSinkFunction<T> extends Serializable, Function { + void process(T element, RuntimeContext ctx, RequestIndexer indexer); +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java new file mode 100644 index 0000000..144a87b --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch2; + +import org.elasticsearch.action.ActionRequest; + +import java.io.Serializable; + +public interface RequestIndexer extends Serializable { + void add(ActionRequest... actionRequests); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java new file mode 100644 index 0000000..bc9bedc --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch2; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase { + + private static final int NUM_ELEMENTS = 20; + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testTransportClient() throws Exception { + + File dataDir = tempFolder.newFolder(); + + Node node = NodeBuilder.nodeBuilder() + .settings(Settings.settingsBuilder() + .put("path.home", dataDir.getParent()) + .put("http.enabled", false) + .put("path.data", dataDir.getAbsolutePath())) + // set a custom cluster name to verify that user config works correctly + .clusterName("my-transport-client-cluster") + .node(); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); + + Map<String, String> config = new HashMap<>(); + // This instructs the sink to emit after every element, otherwise they would be buffered + config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + config.put("cluster.name", "my-transport-client-cluster"); + + // Can't use {@link TransportAddress} as its not Serializable in Elasticsearch 2.x + List<InetSocketAddress> transports = new ArrayList<>(); + transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); + + source.addSink(new ElasticsearchSink<>(config, transports, new TestElasticsearchSinkFunction())); + + env.execute("Elasticsearch TransportClient Test"); + + // verify the results + Client client = node.client(); + for (int i = 0; i < NUM_ELEMENTS; i++) { + GetResponse response = client.get(new GetRequest("my-index", + "my-type", Integer.toString(i))).actionGet(); + Assert.assertEquals("message #" + i, response.getSource().get("data")); + } + + node.close(); + } + + @Test(expected = IllegalArgumentException.class) + public void testNullTransportClient() throws Exception { + + File dataDir = tempFolder.newFolder(); + + Node node = NodeBuilder.nodeBuilder() + .settings(Settings.settingsBuilder() + .put("path.home", dataDir.getParent()) + .put("http.enabled", false) + .put("path.data", dataDir.getAbsolutePath())) + // set a custom cluster name to verify that user config works correctly + .clusterName("my-transport-client-cluster") + .node(); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); + + Map<String, String> config = new HashMap<>(); + // This instructs the sink to emit after every element, otherwise they would be buffered + config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + config.put("cluster.name", "my-transport-client-cluster"); + + source.addSink(new ElasticsearchSink<>(config, null, new TestElasticsearchSinkFunction())); + + env.execute("Elasticsearch TransportClient Test"); + + // verify the results + Client client = node.client(); + for (int i = 0; i < NUM_ELEMENTS; i++) { + GetResponse response = client.get(new GetRequest("my-index", + "my-type", Integer.toString(i))).actionGet(); + Assert.assertEquals("message #" + i, response.getSource().get("data")); + } + + node.close(); + } + + @Test(expected = IllegalArgumentException.class) + public void testEmptyTransportClient() throws Exception { + + File dataDir = tempFolder.newFolder(); + + Node node = NodeBuilder.nodeBuilder() + .settings(Settings.settingsBuilder() + .put("path.home", dataDir.getParent()) + .put("http.enabled", false) + .put("path.data", dataDir.getAbsolutePath())) + // set a custom cluster name to verify that user config works correctly + .clusterName("my-transport-client-cluster") + .node(); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); + + Map<String, String> config = new HashMap<>(); + // This instructs the sink to emit after every element, otherwise they would be buffered + config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + config.put("cluster.name", "my-transport-client-cluster"); + + source.addSink(new ElasticsearchSink<>(config, new ArrayList<InetSocketAddress>(), new TestElasticsearchSinkFunction())); + + env.execute("Elasticsearch TransportClient Test"); + + // verify the results + Client client = node.client(); + for (int i = 0; i < NUM_ELEMENTS; i++) { + GetResponse response = client.get(new GetRequest("my-index", + "my-type", Integer.toString(i))).actionGet(); + Assert.assertEquals("message #" + i, response.getSource().get("data")); + } + + node.close(); + } + + @Test(expected = JobExecutionException.class) + public void testTransportClientFails() throws Exception{ + // this checks whether the TransportClient fails early when there is no cluster to + // connect to. There isn't a similar test for the Node Client version since that + // one will block and wait for a cluster to come online + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction()); + + Map<String, String> config = new HashMap<>(); + // This instructs the sink to emit after every element, otherwise they would be buffered + config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + config.put("cluster.name", "my-node-client-cluster"); + + List<InetSocketAddress> transports = new ArrayList<>(); + transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); + + source.addSink(new ElasticsearchSink<>(config, transports, new TestElasticsearchSinkFunction())); + + env.execute("Elasticsearch Node Client Test"); + } + + private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> { + private static final long serialVersionUID = 1L; + + private volatile boolean running = true; + + @Override + public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception { + for (int i = 0; i < NUM_ELEMENTS && running; i++) { + ctx.collect(Tuple2.of(i, "message #" + i)); + } + } + + @Override + public void cancel() { + running = false; + } + } + + private static class TestElasticsearchSinkFunction implements ElasticsearchSinkFunction<Tuple2<Integer, String>> { + private static final long serialVersionUID = 1L; + + public IndexRequest createIndexRequest(Tuple2<Integer, String> element) { + Map<String, Object> json = new HashMap<>(); + json.put("data", element.f1); + + return Requests.indexRequest() + .index("my-index") + .type("my-type") + .id(element.f0.toString()) + .source(json); + } + + @Override + public void process(Tuple2<Integer, String> element, RuntimeContext ctx, RequestIndexer indexer) { + indexer.add(createIndexRequest(element)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java new file mode 100644 index 0000000..05760e8 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.elasticsearch2.examples; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink; +import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch2.RequestIndexer; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that + * you have a cluster named "elasticsearch" running or change the name of cluster in the config map. + */ +public class ElasticsearchExample { + + public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SingleOutputStreamOperator<String> source = + env.generateSequence(0, 20).map(new MapFunction<Long, String>() { + /** + * The mapping method. Takes an element from the input data set and transforms + * it into exactly one element. + * + * @param value The input value. + * @return The transformed value + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. + */ + @Override + public String map(Long value) throws Exception { + return "message #" + value; + } + }); + + Map<String, String> config = new HashMap<>(); + // This instructs the sink to emit after every element, otherwise they would be buffered + config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + + List<InetSocketAddress> transports = new ArrayList<>(); + transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); + + source.addSink(new ElasticsearchSink<>(config, transports, new ElasticsearchSinkFunction<String>(){ + @Override + public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { + indexer.add(createIndexRequest(element)); + } + })); + + env.execute("Elasticsearch Example"); + } + + private static IndexRequest createIndexRequest(String element) { + Map<String, Object> json = new HashMap<>(); + json.put("data", element); + + return Requests.indexRequest() + .index("my-index") + .type("my-type") + .id(element) + .source(json); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..dc20726 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=OFF, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml b/flink-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml new file mode 100644 index 0000000..7a077c2 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml @@ -0,0 +1,30 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> + <logger name="org.apache.flink.streaming.connectors.elasticsearch2" level="WARN"/> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/pom.xml b/flink-connectors/flink-connector-filesystem/pom.xml new file mode 100644 index 0000000..fbc830a --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/pom.xml @@ -0,0 +1,163 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.2-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-filesystem_2.10</artifactId> + <name>flink-connector-filesystem</name> + + <packaging>jar</packaging> + + <!-- + This is a Hadoop2 only flink module. + --> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-hadoop2</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils-junit</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-hadoop-compatibility_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-tests_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>test</scope> + <type>test-jar</type> + <version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$--> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>test</scope> + <type>test-jar</type> + <version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$--> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minikdc</artifactId> + <version>${minikdc.version}</version> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + + <!-- + https://issues.apache.org/jira/browse/DIRSHARED-134 + Required to pull the Mini-KDC transitive dependency + --> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <version>3.0.1</version> + <inherited>true</inherited> + <extensions>true</extensions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <!-- + Enforce single threaded execution to avoid port conflicts when running + secure mini DFS cluster + --> + <forkCount>1</forkCount> + <reuseForks>false</reuseForks> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java new file mode 100644 index 0000000..3e3c86b --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java @@ -0,0 +1,309 @@ +package org.apache.flink.streaming.connectors.fs; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileConstants; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.InputTypeConfigurable; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +/** +* Implementation of AvroKeyValue writer that can be used in Sink. +* Each entry would be wrapped in GenericRecord with key/value fields(same as in m/r lib) +<pre> +Usage: +{@code + BucketingSink<Tuple2<Long, Long>> sink = new BucketingSink<Tuple2<Long, Long>>("/tmp/path"); + sink.setBucketer(new DateTimeBucketer<Tuple2<Long, Long>>("yyyy-MM-dd/HH/mm/")); + sink.setPendingSuffix(".avro"); + Map<String,String> properties = new HashMap<>(); + Schema longSchema = Schema.create(Type.LONG); + String keySchema = longSchema.toString(); + String valueSchema = longSchema.toString(); + properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema); + properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema); + properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, Boolean.toString(true)); + properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC); + + sink.setWriter(new AvroSinkWriter<Long, Long>(properties)); + sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB, +} +</pre> +*/ +public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>> implements Writer<Tuple2<K, V>>, InputTypeConfigurable { + private static final long serialVersionUID = 1L; + public static final String CONF_OUTPUT_KEY_SCHEMA = "avro.schema.output.key"; + public static final String CONF_OUTPUT_VALUE_SCHEMA = "avro.schema.output.value"; + public static final String CONF_COMPRESS = FileOutputFormat.COMPRESS; + public static final String CONF_COMPRESS_CODEC = FileOutputFormat.COMPRESS_CODEC; + public static final String CONF_DEFLATE_LEVEL = "avro.deflate.level"; + public static final String CONF_XZ_LEVEL = "avro.xz.level"; + + private transient AvroKeyValueWriter<K, V> keyValueWriter; + + private final Map<String, String> properties; + + /** + * C'tor for the writer + * <p> + * You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above) + * @param properties + */ + @SuppressWarnings("deprecation") + public AvroKeyValueSinkWriter(Map<String, String> properties) { + this.properties = properties; + + String keySchemaString = properties.get(CONF_OUTPUT_KEY_SCHEMA); + if (keySchemaString == null) { + throw new IllegalStateException("No key schema provided, set '" + CONF_OUTPUT_KEY_SCHEMA + "' property"); + } + Schema.parse(keySchemaString);//verifying that schema valid + + String valueSchemaString = properties.get(CONF_OUTPUT_VALUE_SCHEMA); + if (valueSchemaString == null) { + throw new IllegalStateException("No value schema provided, set '" + CONF_OUTPUT_VALUE_SCHEMA + "' property"); + } + Schema.parse(valueSchemaString);//verifying that schema valid + } + + private boolean getBoolean(Map<String,String> conf, String key, boolean def) { + String value = conf.get(key); + if (value == null) { + return def; + } + return Boolean.parseBoolean(value); + } + + private int getInt(Map<String,String> conf, String key, int def) { + String value = conf.get(key); + if (value == null) { + return def; + } + return Integer.parseInt(value); + } + + //this derived from AvroOutputFormatBase.getCompressionCodec(..) + private CodecFactory getCompressionCodec(Map<String,String> conf) { + if (getBoolean(conf, CONF_COMPRESS, false)) { + int deflateLevel = getInt(conf, CONF_DEFLATE_LEVEL, CodecFactory.DEFAULT_DEFLATE_LEVEL); + int xzLevel = getInt(conf, CONF_XZ_LEVEL, CodecFactory.DEFAULT_XZ_LEVEL); + + String outputCodec = conf.get(CONF_COMPRESS_CODEC); + + if (DataFileConstants.DEFLATE_CODEC.equals(outputCodec)) { + return CodecFactory.deflateCodec(deflateLevel); + } else if (DataFileConstants.XZ_CODEC.equals(outputCodec)) { + return CodecFactory.xzCodec(xzLevel); + } else { + return CodecFactory.fromString(outputCodec); + } + } + return CodecFactory.nullCodec(); + } + + @Override + @SuppressWarnings("deprecation") + public void open(FileSystem fs, Path path) throws IOException { + super.open(fs, path); + + CodecFactory compressionCodec = getCompressionCodec(properties); + Schema keySchema = Schema.parse(properties.get(CONF_OUTPUT_KEY_SCHEMA)); + Schema valueSchema = Schema.parse(properties.get(CONF_OUTPUT_VALUE_SCHEMA)); + keyValueWriter = new AvroKeyValueWriter<K, V>(keySchema, valueSchema, compressionCodec, getStream()); + } + + @Override + public void close() throws IOException { + super.close();//the order is important since super.close flushes inside + if (keyValueWriter != null) { + keyValueWriter.close(); + } + } + + @Override + public long flush() throws IOException { + if (keyValueWriter != null) { + keyValueWriter.sync(); + } + return super.flush(); + } + + @Override + public void write(Tuple2<K, V> element) throws IOException { + getStream(); // Throws if the stream is not open + keyValueWriter.write(element.f0, element.f1); + } + + @Override + public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { + if (!type.isTupleType()) { + throw new IllegalArgumentException("Input TypeInformation is not a tuple type."); + } + + TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>) type; + + if (tupleType.getArity() != 2) { + throw new IllegalArgumentException("Input TypeInformation must be a Tuple2 type."); + } + } + + @Override + public Writer<Tuple2<K, V>> duplicate() { + return new AvroKeyValueSinkWriter<K, V>(properties); + } + + // taken from m/r avro lib to remove dependency on it + private static final class AvroKeyValueWriter<K, V> { + /** A writer for the Avro container file. */ + private final DataFileWriter<GenericRecord> mAvroFileWriter; + + /** + * The writer schema for the generic record entries of the Avro + * container file. + */ + private final Schema mKeyValuePairSchema; + + /** + * A reusable Avro generic record for writing key/value pairs to the + * file. + */ + private final AvroKeyValue<Object, Object> mOutputRecord; + + AvroKeyValueWriter(Schema keySchema, Schema valueSchema, + CodecFactory compressionCodec, OutputStream outputStream, + int syncInterval) throws IOException { + // Create the generic record schema for the key/value pair. + mKeyValuePairSchema = AvroKeyValue + .getSchema(keySchema, valueSchema); + + // Create an Avro container file and a writer to it. + DatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>( + mKeyValuePairSchema); + mAvroFileWriter = new DataFileWriter<GenericRecord>( + genericDatumWriter); + mAvroFileWriter.setCodec(compressionCodec); + mAvroFileWriter.setSyncInterval(syncInterval); + mAvroFileWriter.create(mKeyValuePairSchema, outputStream); + + // Create a reusable output record. + mOutputRecord = new AvroKeyValue<Object, Object>( + new GenericData.Record(mKeyValuePairSchema)); + } + + AvroKeyValueWriter(Schema keySchema, Schema valueSchema, + CodecFactory compressionCodec, OutputStream outputStream) + throws IOException { + this(keySchema, valueSchema, compressionCodec, outputStream, + DataFileConstants.DEFAULT_SYNC_INTERVAL); + } + + void write(K key, V value) throws IOException { + mOutputRecord.setKey(key); + mOutputRecord.setValue(value); + mAvroFileWriter.append(mOutputRecord.get()); + } + + void close() throws IOException { + mAvroFileWriter.close(); + } + + long sync() throws IOException { + return mAvroFileWriter.sync(); + } + } + + // taken from AvroKeyValue avro-mapr lib + public static class AvroKeyValue<K, V> { + /** The name of the key value pair generic record. */ + public static final String KEY_VALUE_PAIR_RECORD_NAME = "KeyValuePair"; + + /** The namespace of the key value pair generic record. */ + public static final String KEY_VALUE_PAIR_RECORD_NAMESPACE = "org.apache.avro.mapreduce"; + + /** The name of the generic record field containing the key. */ + public static final String KEY_FIELD = "key"; + + /** The name of the generic record field containing the value. */ + public static final String VALUE_FIELD = "value"; + + /** The key/value generic record wrapped by this class. */ + public final GenericRecord mKeyValueRecord; + + /** + * Wraps a GenericRecord that is a key value pair. + */ + public AvroKeyValue(GenericRecord keyValueRecord) { + mKeyValueRecord = keyValueRecord; + } + + public GenericRecord get() { + return mKeyValueRecord; + } + + public void setKey(K key) { + mKeyValueRecord.put(KEY_FIELD, key); + } + + public void setValue(V value) { + mKeyValueRecord.put(VALUE_FIELD, value); + } + + @SuppressWarnings("unchecked") + public K getKey() { + return (K) mKeyValueRecord.get(KEY_FIELD); + } + + @SuppressWarnings("unchecked") + public V getValue() { + return (V) mKeyValueRecord.get(VALUE_FIELD); + } + + /** + * Creates a KeyValuePair generic record schema. + * + * @return A schema for a generic record with two fields: 'key' and + * 'value'. + */ + public static Schema getSchema(Schema keySchema, Schema valueSchema) { + Schema schema = Schema.createRecord(KEY_VALUE_PAIR_RECORD_NAME, + "A key/value pair", KEY_VALUE_PAIR_RECORD_NAMESPACE, false); + schema.setFields(Arrays.asList(new Schema.Field(KEY_FIELD, + keySchema, "The key", null), new Schema.Field(VALUE_FIELD, + valueSchema, "The value", null))); + return schema; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java new file mode 100644 index 0000000..24ad6ab --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.fs; + +import org.apache.hadoop.fs.Path; + +import java.io.Serializable; + +/** + * A bucketer is used with a {@link RollingSink} + * to put emitted elements into rolling files. + * + * <p> + * The {@code RollingSink} has one active bucket that it is writing to at a time. Whenever + * a new element arrives it will ask the {@code Bucketer} if a new bucket should be started and + * the old one closed. The {@code Bucketer} can, for example, decide to start new buckets + * based on system time. + * + * @deprecated use {@link org.apache.flink.streaming.connectors.fs.bucketing.Bucketer} instead. + */ +@Deprecated +public interface Bucketer extends Serializable { + + /** + * Returns {@code true} when a new bucket should be started. + * + * @param currentBucketPath The bucket {@code Path} that is currently being used. + */ + boolean shouldStartNewBucket(Path basePath, Path currentBucketPath); + + /** + * Returns the {@link Path} of a new bucket file. + * + * @param basePath The base path containing all the buckets. + * + * @return The complete new {@code Path} of the new bucket. This should include the {@code basePath} + * and also the {@code subtaskIndex} tp avoid clashes with parallel sinks. + */ + Path getNextBucketPath(Path basePath); +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java new file mode 100644 index 0000000..174707c --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.fs; + + +/** + * A clock that can provide the current time. + * + * <p> + * Normally this would be system time, but for testing a custom {@code Clock} can be provided. + */ +public interface Clock { + + /** + * Return the current system time in milliseconds. + */ + public long currentTimeMillis(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java new file mode 100644 index 0000000..0df8998 --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.fs; + +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * A {@link Bucketer} that assigns to buckets based on current system time. + * + * <p> + * The {@code DateTimeBucketer} will create directories of the following form: + * {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path + * that was specified as a base path when creating the + * {@link RollingSink}. The {@code dateTimePath} + * is determined based on the current system time and the user provided format string. + * + * <p> + * {@link SimpleDateFormat} is used to derive a date string from the current system time and + * the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling + * files will have a granularity of hours. + * + * + * <p> + * Example: + * + * <pre>{@code + * Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH"); + * }</pre> + * + * This will create for example the following bucket path: + * {@code /base/1976-12-31-14/} + * + * @deprecated use {@link org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer} instead. + */ +@Deprecated +public class DateTimeBucketer implements Bucketer { + + private static Logger LOG = LoggerFactory.getLogger(DateTimeBucketer.class); + + private static final long serialVersionUID = 1L; + + private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH"; + + // We have this so that we can manually set it for tests. + private static Clock clock = new SystemClock(); + + private final String formatString; + + private transient SimpleDateFormat dateFormatter; + + /** + * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"}. + */ + public DateTimeBucketer() { + this(DEFAULT_FORMAT_STRING); + } + + /** + * Creates a new {@code DateTimeBucketer} with the given date/time format string. + * + * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine + * the bucket path. + */ + public DateTimeBucketer(String formatString) { + this.formatString = formatString; + + this.dateFormatter = new SimpleDateFormat(formatString); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + + this.dateFormatter = new SimpleDateFormat(formatString); + } + + + @Override + public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) { + String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis())); + return !(new Path(basePath, newDateTimeString).equals(currentBucketPath)); + } + + @Override + public Path getNextBucketPath(Path basePath) { + String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis())); + return new Path(basePath + "/" + newDateTimeString); + } + + @Override + public String toString() { + return "DateTimeBucketer{" + + "formatString='" + formatString + '\'' + + '}'; + } + + /** + * This sets the internal {@link Clock} implementation. This method should only be used for testing + * + * @param newClock The new clock to set. + */ + public static void setClock(Clock newClock) { + clock = newClock; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java new file mode 100644 index 0000000..6854596 --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.fs; + +import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer; +import org.apache.hadoop.fs.Path; + +/** + * A {@link Bucketer} that does not perform any + * rolling of files. All files are written to the base path. + * + * @deprecated use {@link BasePathBucketer} instead. + */ +@Deprecated +public class NonRollingBucketer implements Bucketer { + private static final long serialVersionUID = 1L; + + @Override + public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) { + return false; + } + + @Override + public Path getNextBucketPath(Path basePath) { + return basePath; + } + + @Override + public String toString() { + return "NonRollingBucketer"; + } +}
