Flume source
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/bbdab0e8 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/bbdab0e8 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/bbdab0e8 Branch: refs/heads/master Commit: bbdab0e8a417dd15813d947ef16bcc65bb01c7d7 Parents: c84a2c8 Author: Chetan Narsude <[email protected]> Authored: Sun Feb 19 21:27:29 2017 +0530 Committer: Pramod Immaneni <[email protected]> Committed: Mon May 22 16:47:34 2017 -0700 ---------------------------------------------------------------------- flume/pom.xml | 275 +++++++ .../datatorrent/flume/discovery/Discovery.java | 68 ++ .../flume/discovery/ZKAssistedDiscovery.java | 429 +++++++++++ .../interceptor/ColumnFilteringInterceptor.java | 204 +++++ .../operator/AbstractFlumeInputOperator.java | 760 +++++++++++++++++++ .../com/datatorrent/flume/sink/DTFlumeSink.java | 571 ++++++++++++++ .../java/com/datatorrent/flume/sink/Server.java | 419 ++++++++++ .../datatorrent/flume/source/TestSource.java | 248 ++++++ .../datatorrent/flume/storage/DebugWrapper.java | 131 ++++ .../flume/storage/ErrorMaskingEventCodec.java | 61 ++ .../datatorrent/flume/storage/EventCodec.java | 91 +++ .../flume-conf/flume-conf.sample.properties | 45 ++ .../resources/flume-conf/flume-env.sample.sh | 36 + .../discovery/ZKAssistedDiscoveryTest.java | 142 ++++ .../flume/integration/ApplicationTest.java | 116 +++ .../ColumnFilteringInterceptorTest.java | 85 +++ .../interceptor/InterceptorTestHelper.java | 214 ++++++ .../datatorrent/flume/interceptor/RawEvent.java | 119 +++ .../AbstractFlumeInputOperatorTest.java | 56 ++ .../datatorrent/flume/sink/DTFlumeSinkTest.java | 143 ++++ .../com/datatorrent/flume/sink/ServerTest.java | 92 +++ .../resources/flume/conf/flume-conf.properties | 85 +++ .../src/test/resources/flume/conf/flume-env.sh | 36 + flume/src/test/resources/log4j.properties | 38 + .../test/resources/test_data/gentxns/2013121500 | Bin 0 -> 225010 bytes .../test/resources/test_data/gentxns/2013121501 | Bin 0 -> 224956 bytes .../test/resources/test_data/gentxns/2013121502 | Bin 0 -> 225028 bytes .../test/resources/test_data/gentxns/2013121503 | Bin 0 -> 225068 bytes .../test/resources/test_data/gentxns/2013121504 | Bin 0 -> 224845 bytes .../test/resources/test_data/gentxns/2013121505 | Bin 0 -> 225004 bytes .../test/resources/test_data/gentxns/2013121506 | Bin 0 -> 224929 bytes .../test/resources/test_data/gentxns/2013121507 | Bin 0 -> 224879 bytes .../test/resources/test_data/gentxns/2013121508 | Bin 0 -> 224963 bytes .../test/resources/test_data/gentxns/2013121509 | Bin 0 -> 224963 bytes .../test/resources/test_data/gentxns/2013121510 | Bin 0 -> 225007 bytes .../test/resources/test_data/gentxns/2013121511 | Bin 0 -> 224913 bytes .../test/resources/test_data/gentxns/2013121512 | Bin 0 -> 224929 bytes .../test/resources/test_data/gentxns/2013121513 | Bin 0 -> 225078 bytes .../test/resources/test_data/gentxns/2013121514 | Bin 0 -> 224882 bytes .../test/resources/test_data/gentxns/2013121515 | Bin 0 -> 224958 bytes .../test/resources/test_data/gentxns/2013121516 | Bin 0 -> 225032 bytes .../test/resources/test_data/gentxns/2013121517 | Bin 0 -> 225059 bytes .../test/resources/test_data/gentxns/2013121518 | Bin 0 -> 224890 bytes .../test/resources/test_data/gentxns/2013121519 | Bin 0 -> 225000 bytes .../test/resources/test_data/gentxns/2013121520 | Bin 0 -> 225064 bytes .../test/resources/test_data/gentxns/2013121521 | Bin 0 -> 225091 bytes 46 files changed, 4464 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/pom.xml ---------------------------------------------------------------------- diff --git a/flume/pom.xml b/flume/pom.xml new file mode 100644 index 0000000..ade05a0 --- /dev/null +++ b/flume/pom.xml @@ -0,0 +1,275 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>dt-megh</artifactId> + <groupId>com.datatorrent</groupId> + <version>3.6.0-SNAPSHOT</version> + </parent> + + <artifactId>dt-flume</artifactId> + <packaging>jar</packaging> + <name>DataTorrent Flume Integration</name> + + <profiles> + <profile> + <id>release</id> + <properties> + <package.username>flume</package.username> + <rpm.skip>package</rpm.skip> + <rpm.phase>${rpm.skip}</rpm.phase> + </properties> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>rpm-maven-plugin</artifactId> + <version>2.1-alpha-4</version> + <executions> + <execution> + <phase>${rpm.phase}</phase> + <id>generate-sink-rpm</id> + <goals> + <goal>attached-rpm</goal> + </goals> + <configuration> + <license>Copyright © 2014 DataTorrent, Inc.</license> + <version>${parsedVersion.majorVersion}.${parsedVersion.minorVersion}.${parsedVersion.incrementalVersion}</version> + <release>${parsedVersion.qualifier}${parsedVersion.buildNumber}</release> + <workarea>target/sink-rpm</workarea> + <classifier>sink</classifier> + <name>datatorrent-flume-sink</name> + <distribution>DataTorrent Enterprise ${project.version}</distribution> + <group>Messaging Client Support</group> + <icon>src/main/resources/logo.gif</icon> + <packager>DataTorrent Build System</packager> + <prefix>${package.prefix}</prefix> + <changelogFile>src/changelog</changelogFile> + <defineStatements> + <defineStatement>_unpackaged_files_terminate_build 0</defineStatement> + </defineStatements> + <mappings> + <mapping> + <directory>${package.prefix}/flume-${project.version}/lib</directory> + <filemode>750</filemode> + <username>${package.username}</username> + <groupname>${package.groupname}</groupname> + <artifact></artifact> + <dependency> + <includes> + <include>org.apache.apex:apex-api:jar:${apex.core.version}</include> + <include>com.datatorrent:dt-netlet:jar:1.2.0</include> + <include>org.apache.apex:apex-common:jar:${apex.core.version}</include> + <include>com.esotericsoftware.kryo:kryo:jar:2.24.0</include> + <include>com.esotericsoftware.minlog:minlog:jar:1.2</include> + <include>org.objenesis:objenesis:jar:2.1</include> + <include>org.apache.curator:curator-client:jar:2.3.0</include> + <include>org.apache.curator:curator-x-discovery:jar:2.3.0</include> + <include>org.apache.curator:curator-framework:jar:2.3.0</include> + </includes> + </dependency> + </mapping> + <mapping> + <directory>${package.prefix}/flume-${project.version}/conf</directory> + <configuration>true</configuration> + <filemode>640</filemode> + <username>${package.username}</username> + <groupname>${package.groupname}</groupname> + <sources> + <source> + <location>src/main/resources/flume-conf</location> + </source> + </sources> + </mapping> + </mappings> + <preinstallScriptlet> + <script>groupadd -f ${package.groupname} && id ${package.username} >/dev/null 2>&1 && usermod -aG ${package.groupname} ${package.username} || useradd -g ${package.groupname} ${package.username}</script> + </preinstallScriptlet> + </configuration> + </execution> + + <execution> + <phase>${rpm.phase}</phase> + <id>generate-operator-rpm</id> + <goals> + <goal>attached-rpm</goal> + </goals> + <configuration> + <version>${parsedVersion.majorVersion}.${parsedVersion.minorVersion}.${parsedVersion.incrementalVersion}</version> + <license>Copyright © 2014 DataTorrent, Inc.</license> + <release>${parsedVersion.qualifier}${parsedVersion.buildNumber}</release> + <workarea>target/operator-rpm</workarea> + <classifier>operator</classifier> + <name>datatorrent-flume-operator</name> + <distribution>DataTorrent Enterprise ${project.version}</distribution> + <group>Messaging Client Support</group> + <icon>src/main/resources/logo.gif</icon> + <packager>DataTorrent Build System</packager> + <prefix>${package.prefix}</prefix> + <changelogFile>src/changelog</changelogFile> + <description>${rpm.release}</description> + <defineStatements> + <defineStatement>_unpackaged_files_terminate_build 0</defineStatement> + </defineStatements> + <mappings> + <mapping> + <directory>${package.prefix}/flume-operator-${project.version}/lib</directory> + <filemode>640</filemode> + <username>${package.username}</username> + <groupname>${package.groupname}</groupname> + <artifact></artifact> + <dependency> + <includes> + <include>org.apache.curator:curator-client:jar:2.3.0</include> + <include>org.apache.curator:curator-x-discovery:jar:2.3.0</include> + <include>org.apache.curator:curator-framework:jar:2.3.0</include> + <include>org.apache.flume:flume-ng-sdk:jar:1.5.0</include> + <include>org.apache.flume:flume-ng-core:jar:1.5.0</include> + <include>org.apache.flume:flume-ng-configuration:jar:1.5.0</include> + </includes> + </dependency> + </mapping> + </mappings> + <preinstallScriptlet> + <script>groupadd -f ${package.groupname} && id ${package.username} >/dev/null 2>&1 && usermod -aG ${package.groupname} ${package.username} || useradd -g ${package.groupname} ${package.username}</script> + </preinstallScriptlet> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.16</version> + <configuration> + <argLine>-Xmx5000M</argLine> + </configuration> + </plugin> + </plugins> + </build> + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.2</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>apex-common</artifactId> + <version>${apex.core.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + <version>1.5.0</version> + <exclusions> + <exclusion> + <!-- Curator requires later version of Guava --> + <artifactId>guava</artifactId> + <groupId>com.google.guava</groupId> + </exclusion> + <exclusion> + <artifactId>jackson-core-asl</artifactId> + <groupId>org.codehaus.jackson</groupId> + </exclusion> + <exclusion> + <artifactId>jackson-mapper-asl</artifactId> + <groupId>org.codehaus.jackson</groupId> + </exclusion> + <exclusion> + <artifactId>jetty</artifactId> + <groupId>org.mortbay.jetty</groupId> + </exclusion> + <exclusion> + <artifactId>jetty-util</artifactId> + <groupId>org.mortbay.jetty</groupId> + </exclusion> + <exclusion> + <artifactId>commons-codec</artifactId> + <groupId>commons-codec</groupId> + </exclusion> + <exclusion> + <artifactId>commons-io</artifactId> + <groupId>commons-io</groupId> + </exclusion> + <exclusion> + <artifactId>commons-lang</artifactId> + <groupId>commons-lang</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>apex-api</artifactId> + <version>${apex.core.version}</version> + </dependency> + <dependency> + <groupId>com.datatorrent</groupId> + <artifactId>netlet</artifactId> + <version>1.2.0</version> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-x-discovery</artifactId> + <version>2.3.0</version> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <version>${jackson.version}</version> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + <version>${jackson.version}</version> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>11.0.2</version> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/main/java/com/datatorrent/flume/discovery/Discovery.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/com/datatorrent/flume/discovery/Discovery.java b/flume/src/main/java/com/datatorrent/flume/discovery/Discovery.java new file mode 100644 index 0000000..d802002 --- /dev/null +++ b/flume/src/main/java/com/datatorrent/flume/discovery/Discovery.java @@ -0,0 +1,68 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.discovery; + +import java.util.Collection; + +/** + * When DTFlumeSink server instance binds to the network interface, it can publish + * its whereabouts by invoking advertise method on the Discovery object. Similarly + * when it ceases accepting any more connections, it can publish its intent to do + * so by invoking unadvertise.<p /> + * Interesting parties can call discover method to get the list of addresses where + * they can find an available DTFlumeSink server instance. + * + * @author Chetan Narsude <[email protected]> + * @param <T> - Type of the objects which can be discovered + * @since 0.9.3 + */ +public interface Discovery<T> +{ + /** + * Recall the previously published address as it's no longer valid. + * + * @param service + */ + void unadvertise(Service<T> service); + + /** + * Advertise the host/port address where DTFlumeSink is accepting a client connection. + * + * @param service + */ + void advertise(Service<T> service); + + /** + * Discover all the addresses which are actively accepting the client connections. + * + * @return - Active server addresses which can accept the connections. + */ + Collection<Service<T>> discover(); + + interface Service<T> + { + String getHost(); + + int getPort(); + + T getPayload(); + + String getId(); + + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/main/java/com/datatorrent/flume/discovery/ZKAssistedDiscovery.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/com/datatorrent/flume/discovery/ZKAssistedDiscovery.java b/flume/src/main/java/com/datatorrent/flume/discovery/ZKAssistedDiscovery.java new file mode 100644 index 0000000..460a478 --- /dev/null +++ b/flume/src/main/java/com/datatorrent/flume/discovery/ZKAssistedDiscovery.java @@ -0,0 +1,429 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.discovery; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; + +import javax.validation.constraints.NotNull; + +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.ObjectReader; +import org.codehaus.jackson.map.ObjectWriter; +import org.codehaus.jackson.type.TypeReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.curator.utils.EnsurePath; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; +import org.apache.curator.x.discovery.ServiceInstance; +import org.apache.curator.x.discovery.details.InstanceSerializer; +import org.apache.flume.conf.Configurable; + +import com.google.common.base.Throwables; + +import com.datatorrent.api.Component; + +/** + * <p>ZKAssistedDiscovery class.</p> + * + * @author Chetan Narsude <[email protected]> + * @since 0.9.3 + */ +public class ZKAssistedDiscovery implements Discovery<byte[]>, + Component<com.datatorrent.api.Context>, Configurable, Serializable +{ + @NotNull + private String serviceName; + @NotNull + private String connectionString; + @NotNull + private String basePath; + private int connectionTimeoutMillis; + private int connectionRetryCount; + private int conntectionRetrySleepMillis; + private transient InstanceSerializerFactory instanceSerializerFactory; + private transient CuratorFramework curatorFramework; + private transient ServiceDiscovery<byte[]> discovery; + + public ZKAssistedDiscovery() + { + this.serviceName = "DTFlume"; + this.conntectionRetrySleepMillis = 500; + this.connectionRetryCount = 10; + this.connectionTimeoutMillis = 1000; + } + + @Override + public void unadvertise(Service<byte[]> service) + { + doAdvertise(service, false); + } + + @Override + public void advertise(Service<byte[]> service) + { + doAdvertise(service, true); + } + + public void doAdvertise(Service<byte[]> service, boolean flag) + { + try { + new EnsurePath(basePath).ensure(curatorFramework.getZookeeperClient()); + + ServiceInstance<byte[]> instance = getInstance(service); + if (flag) { + discovery.registerService(instance); + } else { + discovery.unregisterService(instance); + } + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public Collection<Service<byte[]>> discover() + { + try { + new EnsurePath(basePath).ensure(curatorFramework.getZookeeperClient()); + + Collection<ServiceInstance<byte[]>> services = discovery.queryForInstances(serviceName); + ArrayList<Service<byte[]>> returnable = new ArrayList<Service<byte[]>>(services.size()); + for (final ServiceInstance<byte[]> service : services) { + returnable.add(new Service<byte[]>() + { + @Override + public String getHost() + { + return service.getAddress(); + } + + @Override + public int getPort() + { + return service.getPort(); + } + + @Override + public byte[] getPayload() + { + return service.getPayload(); + } + + @Override + public String getId() + { + return service.getId(); + } + + @Override + public String toString() + { + return "{" + getId() + " => " + getHost() + ':' + getPort() + '}'; + } + + }); + } + return returnable; + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public String toString() + { + return "ZKAssistedDiscovery{" + "serviceName=" + serviceName + ", connectionString=" + connectionString + + ", basePath=" + basePath + ", connectionTimeoutMillis=" + connectionTimeoutMillis + ", connectionRetryCount=" + + connectionRetryCount + ", conntectionRetrySleepMillis=" + conntectionRetrySleepMillis + '}'; + } + + @Override + public int hashCode() + { + int hash = 7; + hash = 47 * hash + this.serviceName.hashCode(); + hash = 47 * hash + this.connectionString.hashCode(); + hash = 47 * hash + this.basePath.hashCode(); + hash = 47 * hash + this.connectionTimeoutMillis; + hash = 47 * hash + this.connectionRetryCount; + hash = 47 * hash + this.conntectionRetrySleepMillis; + return hash; + } + + @Override + public boolean equals(Object obj) + { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final ZKAssistedDiscovery other = (ZKAssistedDiscovery)obj; + if (!this.serviceName.equals(other.serviceName)) { + return false; + } + if (!this.connectionString.equals(other.connectionString)) { + return false; + } + if (!this.basePath.equals(other.basePath)) { + return false; + } + if (this.connectionTimeoutMillis != other.connectionTimeoutMillis) { + return false; + } + if (this.connectionRetryCount != other.connectionRetryCount) { + return false; + } + if (this.conntectionRetrySleepMillis != other.conntectionRetrySleepMillis) { + return false; + } + return true; + } + + ServiceInstance<byte[]> getInstance(Service<byte[]> service) throws Exception + { + return ServiceInstance.<byte[]>builder() + .name(serviceName) + .address(service.getHost()) + .port(service.getPort()) + .id(service.getId()) + .payload(service.getPayload()) + .build(); + } + + private ServiceDiscovery<byte[]> getDiscovery(CuratorFramework curatorFramework) + { + return ServiceDiscoveryBuilder.builder(byte[].class) + .basePath(basePath) + .client(curatorFramework) + .serializer(instanceSerializerFactory.getInstanceSerializer( + new TypeReference<ServiceInstance<byte[]>>() + {})).build(); + } + + /** + * @return the instanceSerializerFactory + */ + InstanceSerializerFactory getInstanceSerializerFactory() + { + return instanceSerializerFactory; + } + + /** + * @return the connectionString + */ + public String getConnectionString() + { + return connectionString; + } + + /** + * @param connectionString the connectionString to set + */ + public void setConnectionString(String connectionString) + { + this.connectionString = connectionString; + } + + /** + * @return the basePath + */ + public String getBasePath() + { + return basePath; + } + + /** + * @param basePath the basePath to set + */ + public void setBasePath(String basePath) + { + this.basePath = basePath; + } + + /** + * @return the connectionTimeoutMillis + */ + public int getConnectionTimeoutMillis() + { + return connectionTimeoutMillis; + } + + /** + * @param connectionTimeoutMillis the connectionTimeoutMillis to set + */ + public void setConnectionTimeoutMillis(int connectionTimeoutMillis) + { + this.connectionTimeoutMillis = connectionTimeoutMillis; + } + + /** + * @return the connectionRetryCount + */ + public int getConnectionRetryCount() + { + return connectionRetryCount; + } + + /** + * @param connectionRetryCount the connectionRetryCount to set + */ + public void setConnectionRetryCount(int connectionRetryCount) + { + this.connectionRetryCount = connectionRetryCount; + } + + /** + * @return the conntectionRetrySleepMillis + */ + public int getConntectionRetrySleepMillis() + { + return conntectionRetrySleepMillis; + } + + /** + * @param conntectionRetrySleepMillis the conntectionRetrySleepMillis to set + */ + public void setConntectionRetrySleepMillis(int conntectionRetrySleepMillis) + { + this.conntectionRetrySleepMillis = conntectionRetrySleepMillis; + } + + /** + * @return the serviceName + */ + public String getServiceName() + { + return serviceName; + } + + /** + * @param serviceName the serviceName to set + */ + public void setServiceName(String serviceName) + { + this.serviceName = serviceName; + } + + @Override + public void configure(org.apache.flume.Context context) + { + serviceName = context.getString("serviceName", "DTFlume"); + connectionString = context.getString("connectionString"); + basePath = context.getString("basePath"); + + connectionTimeoutMillis = context.getInteger("connectionTimeoutMillis", 1000); + connectionRetryCount = context.getInteger("connectionRetryCount", 10); + conntectionRetrySleepMillis = context.getInteger("connectionRetrySleepMillis", 500); + } + + @Override + public void setup(com.datatorrent.api.Context context) + { + ObjectMapper om = new ObjectMapper(); + instanceSerializerFactory = new InstanceSerializerFactory(om.reader(), om.writer()); + + curatorFramework = CuratorFrameworkFactory.builder() + .connectionTimeoutMs(connectionTimeoutMillis) + .retryPolicy(new RetryNTimes(connectionRetryCount, conntectionRetrySleepMillis)) + .connectString(connectionString) + .build(); + curatorFramework.start(); + + discovery = getDiscovery(curatorFramework); + try { + discovery.start(); + } catch (Exception ex) { + Throwables.propagate(ex); + } + } + + @Override + public void teardown() + { + try { + discovery.close(); + } catch (IOException ex) { + throw new RuntimeException(ex); + } finally { + curatorFramework.close(); + curatorFramework = null; + } + } + + public class InstanceSerializerFactory + { + private final ObjectReader objectReader; + private final ObjectWriter objectWriter; + + InstanceSerializerFactory(ObjectReader objectReader, ObjectWriter objectWriter) + { + this.objectReader = objectReader; + this.objectWriter = objectWriter; + } + + public <T> InstanceSerializer<T> getInstanceSerializer( + TypeReference<ServiceInstance<T>> typeReference) + { + return new JacksonInstanceSerializer<T>(objectReader, objectWriter, typeReference); + } + + final class JacksonInstanceSerializer<T> implements InstanceSerializer<T> + { + private final TypeReference<ServiceInstance<T>> typeRef; + private final ObjectWriter objectWriter; + private final ObjectReader objectReader; + + JacksonInstanceSerializer(ObjectReader objectReader, ObjectWriter objectWriter, + TypeReference<ServiceInstance<T>> typeRef) + { + this.objectReader = objectReader; + this.objectWriter = objectWriter; + this.typeRef = typeRef; + } + + @Override + public ServiceInstance<T> deserialize(byte[] bytes) throws Exception + { + return objectReader.withType(typeRef).readValue(bytes); + } + + @Override + public byte[] serialize(ServiceInstance<T> serviceInstance) throws Exception + { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + objectWriter.writeValue(out, serviceInstance); + return out.toByteArray(); + } + + } + + } + + private static final long serialVersionUID = 201401221145L; + private static final Logger logger = LoggerFactory.getLogger(ZKAssistedDiscovery.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptor.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptor.java b/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptor.java new file mode 100644 index 0000000..90c3a04 --- /dev/null +++ b/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptor.java @@ -0,0 +1,204 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.interceptor; + +import java.util.Arrays; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.interceptor.Interceptor; + +import static com.datatorrent.flume.interceptor.ColumnFilteringInterceptor.Constants.COLUMNS; +import static com.datatorrent.flume.interceptor.ColumnFilteringInterceptor.Constants.DST_SEPARATOR; +import static com.datatorrent.flume.interceptor.ColumnFilteringInterceptor.Constants.DST_SEPARATOR_DFLT; +import static com.datatorrent.flume.interceptor.ColumnFilteringInterceptor.Constants.SRC_SEPARATOR; +import static com.datatorrent.flume.interceptor.ColumnFilteringInterceptor.Constants.SRC_SEPARATOR_DFLT; + +/** + * <p>ColumnFilteringInterceptor class.</p> + * + * @author Chetan Narsude <[email protected]> + * @since 0.9.4 + */ +public class ColumnFilteringInterceptor implements Interceptor +{ + private final byte srcSeparator; + private final byte dstSeparator; + + private final int maxIndex; + private final int maxColumn; + private final int[] columns; + private final int[] positions; + + private ColumnFilteringInterceptor(int[] columns, byte srcSeparator, byte dstSeparator) + { + this.columns = columns; + + int tempMaxColumn = Integer.MIN_VALUE; + for (int column: columns) { + if (column > tempMaxColumn) { + tempMaxColumn = column; + } + } + maxIndex = tempMaxColumn; + maxColumn = tempMaxColumn + 1; + positions = new int[maxColumn + 1]; + + this.srcSeparator = srcSeparator; + this.dstSeparator = dstSeparator; + } + + @Override + public void initialize() + { + /* no-op */ + } + + @Override + public Event intercept(Event event) + { + byte[] body = event.getBody(); + if (body == null) { + return event; + } + + final int length = body.length; + + /* store positions of character after the separators */ + int i = 0; + int index = 0; + while (i < length) { + if (body[i++] == srcSeparator) { + positions[++index] = i; + if (index >= maxIndex) { + break; + } + } + } + + int nextVirginIndex; + boolean separatorTerminated; + if (i == length && index < maxColumn) { + nextVirginIndex = index + 2; + positions[nextVirginIndex - 1] = length; + separatorTerminated = length > 0 ? body[length - 1] != srcSeparator : false; + } else { + nextVirginIndex = index + 1; + separatorTerminated = true; + } + + int newArrayLen = 0; + for (i = columns.length; i-- > 0;) { + int column = columns[i]; + int len = positions[column + 1] - positions[column]; + if (len <= 0) { + newArrayLen++; + } else { + if (separatorTerminated && positions[column + 1] == length) { + newArrayLen++; + } + newArrayLen += len; + } + } + + byte[] newbody = new byte[newArrayLen]; + int newoffset = 0; + for (int column: columns) { + int len = positions[column + 1] - positions[column]; + if (len > 0) { + System.arraycopy(body, positions[column], newbody, newoffset, len); + newoffset += len; + if (newbody[newoffset - 1] == srcSeparator) { + newbody[newoffset - 1] = dstSeparator; + } else { + newbody[newoffset++] = dstSeparator; + } + } else { + newbody[newoffset++] = dstSeparator; + } + } + + event.setBody(newbody); + Arrays.fill(positions, 1, nextVirginIndex, 0); + return event; + } + + @Override + public List<Event> intercept(List<Event> events) + { + for (Event event: events) { + intercept(event); + } + return events; + } + + @Override + public void close() + { + } + + public static class Builder implements Interceptor.Builder + { + private int[] columns; + private byte srcSeparator; + private byte dstSeparator; + + @Override + public Interceptor build() + { + return new ColumnFilteringInterceptor(columns, srcSeparator, dstSeparator); + } + + @Override + public void configure(Context context) + { + String sColumns = context.getString(COLUMNS); + if (sColumns == null || sColumns.trim().isEmpty()) { + throw new Error("This interceptor requires filtered columns to be specified!"); + } + + String[] parts = sColumns.split(" "); + columns = new int[parts.length]; + for (int i = parts.length; i-- > 0;) { + columns[i] = Integer.parseInt(parts[i]); + } + + srcSeparator = context.getInteger(SRC_SEPARATOR, (int)SRC_SEPARATOR_DFLT).byteValue(); + dstSeparator = context.getInteger(DST_SEPARATOR, (int)DST_SEPARATOR_DFLT).byteValue(); + } + + } + + @SuppressWarnings("ClassMayBeInterface") /* adhering to flume until i understand it completely */ + + public static class Constants + { + public static final String SRC_SEPARATOR = "srcSeparator"; + public static final byte SRC_SEPARATOR_DFLT = 2; + + public static final String DST_SEPARATOR = "dstSeparator"; + public static final byte DST_SEPARATOR_DFLT = 1; + + public static final String COLUMNS = "columns"; + } + + private static final Logger logger = LoggerFactory.getLogger(ColumnFilteringInterceptor.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/main/java/com/datatorrent/flume/operator/AbstractFlumeInputOperator.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/com/datatorrent/flume/operator/AbstractFlumeInputOperator.java b/flume/src/main/java/com/datatorrent/flume/operator/AbstractFlumeInputOperator.java new file mode 100644 index 0000000..1ab7182 --- /dev/null +++ b/flume/src/main/java/com/datatorrent/flume/operator/AbstractFlumeInputOperator.java @@ -0,0 +1,760 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.operator; + +import java.io.IOException; +import java.io.Serializable; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ArrayBlockingQueue; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.Event; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.Stats.OperatorStats; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.flume.discovery.Discovery.Service; +import com.datatorrent.flume.discovery.ZKAssistedDiscovery; +import com.datatorrent.flume.sink.Server; +import com.datatorrent.flume.sink.Server.Command; +import com.datatorrent.flume.sink.Server.Request; +import com.datatorrent.netlet.AbstractLengthPrependerClient; +import com.datatorrent.netlet.DefaultEventLoop; +import com.datatorrent.netlet.util.Slice; + +import static java.lang.Thread.sleep; + +/** + * <p> + * Abstract AbstractFlumeInputOperator class.</p> + * + * @param <T> Type of the output payload. + * @author Chetan Narsude <[email protected]> + * @since 0.9.2 + */ +public abstract class AbstractFlumeInputOperator<T> + implements InputOperator, Operator.ActivationListener<OperatorContext>, Operator.IdleTimeHandler, + Operator.CheckpointListener, Partitioner<AbstractFlumeInputOperator<T>> +{ + public final transient DefaultOutputPort<T> output = new DefaultOutputPort<T>(); + public final transient DefaultOutputPort<Slice> drop = new DefaultOutputPort<Slice>(); + @NotNull + private String[] connectionSpecs; + @NotNull + private StreamCodec<Event> codec; + private final ArrayList<RecoveryAddress> recoveryAddresses; + @SuppressWarnings("FieldMayBeFinal") // it's not final because that mucks with the serialization somehow + private transient ArrayBlockingQueue<Slice> handoverBuffer; + private transient int idleCounter; + private transient int eventCounter; + private transient DefaultEventLoop eventloop; + private transient volatile boolean connected; + private transient OperatorContext context; + private transient Client client; + private transient long windowId; + private transient byte[] address; + @Min(0) + private long maxEventsPerSecond; + //This is calculated from maxEventsPerSecond, App window count and streaming window size + private transient long maxEventsPerWindow; + + public AbstractFlumeInputOperator() + { + handoverBuffer = new ArrayBlockingQueue<Slice>(1024 * 5); + connectionSpecs = new String[0]; + recoveryAddresses = new ArrayList<RecoveryAddress>(); + maxEventsPerSecond = Long.MAX_VALUE; + } + + @Override + public void setup(OperatorContext context) + { + long windowDurationMillis = context.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) * + context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS); + maxEventsPerWindow = (long)(windowDurationMillis / 1000.0 * maxEventsPerSecond); + logger.debug("max-events per-second {} per-window {}", maxEventsPerSecond, maxEventsPerWindow); + + try { + eventloop = new DefaultEventLoop("EventLoop-" + context.getId()); + eventloop.start(); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + @Override + @SuppressWarnings({"unchecked"}) + public void activate(OperatorContext ctx) + { + if (connectionSpecs.length == 0) { + logger.info("Discovered zero DTFlumeSink"); + } else if (connectionSpecs.length == 1) { + for (String connectAddresse: connectionSpecs) { + logger.debug("Connection spec is {}", connectAddresse); + String[] parts = connectAddresse.split(":"); + eventloop.connect(new InetSocketAddress(parts[1], Integer.parseInt(parts[2])), client = new Client(parts[0])); + } + } else { + throw new IllegalArgumentException( + String.format("A physical %s operator cannot connect to more than 1 addresses!", + this.getClass().getSimpleName())); + } + + context = ctx; + } + + @Override + public void beginWindow(long windowId) + { + this.windowId = windowId; + idleCounter = 0; + eventCounter = 0; + } + + @Override + public void emitTuples() + { + int i = handoverBuffer.size(); + if (i > 0 && eventCounter < maxEventsPerWindow) { + + while (--i > 0 && eventCounter < maxEventsPerWindow - 1) { + final Slice slice = handoverBuffer.poll(); + slice.offset += 8; + slice.length -= 8; + T convert = convert((Event)codec.fromByteArray(slice)); + if (convert == null) { + drop.emit(slice); + } else { + output.emit(convert); + } + eventCounter++; + } + + final Slice slice = handoverBuffer.poll(); + slice.offset += 8; + slice.length -= 8; + T convert = convert((Event)codec.fromByteArray(slice)); + if (convert == null) { + drop.emit(slice); + } else { + output.emit(convert); + } + eventCounter++; + + address = Arrays.copyOfRange(slice.buffer, slice.offset - 8, slice.offset); + } + } + + @Override + public void endWindow() + { + if (connected) { + byte[] array = new byte[Request.FIXED_SIZE]; + + array[0] = Command.WINDOWED.getOrdinal(); + Server.writeInt(array, 1, eventCounter); + Server.writeInt(array, 5, idleCounter); + Server.writeLong(array, Request.TIME_OFFSET, System.currentTimeMillis()); + + logger.debug("wrote {} with eventCounter = {} and idleCounter = {}", Command.WINDOWED, eventCounter, idleCounter); + client.write(array); + } + + if (address != null) { + RecoveryAddress rAddress = new RecoveryAddress(); + rAddress.address = address; + address = null; + rAddress.windowId = windowId; + recoveryAddresses.add(rAddress); + } + } + + @Override + public void deactivate() + { + if (connected) { + eventloop.disconnect(client); + } + context = null; + } + + @Override + public void teardown() + { + eventloop.stop(); + eventloop = null; + } + + @Override + public void handleIdleTime() + { + idleCounter++; + try { + sleep(context.getValue(OperatorContext.SPIN_MILLIS)); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + + public abstract T convert(Event event); + + /** + * @return the connectAddress + */ + public String[] getConnectAddresses() + { + return connectionSpecs.clone(); + } + + /** + * @param specs - sinkid:host:port specification of all the sinks. + */ + public void setConnectAddresses(String[] specs) + { + this.connectionSpecs = specs.clone(); + } + + /** + * @return the codec + */ + public StreamCodec<Event> getCodec() + { + return codec; + } + + /** + * @param codec the codec to set + */ + public void setCodec(StreamCodec<Event> codec) + { + this.codec = codec; + } + + private static class RecoveryAddress implements Serializable + { + long windowId; + byte[] address; + + @Override + public String toString() + { + return "RecoveryAddress{" + "windowId=" + windowId + ", address=" + Arrays.toString(address) + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof RecoveryAddress)) { + return false; + } + + RecoveryAddress that = (RecoveryAddress)o; + + if (windowId != that.windowId) { + return false; + } + return Arrays.equals(address, that.address); + } + + @Override + public int hashCode() + { + int result = (int)(windowId ^ (windowId >>> 32)); + result = 31 * result + (address != null ? Arrays.hashCode(address) : 0); + return result; + } + + private static final long serialVersionUID = 201312021432L; + } + + @Override + public void checkpointed(long windowId) + { + /* dont do anything */ + } + + @Override + public void committed(long windowId) + { + if (!connected) { + return; + } + + synchronized (recoveryAddresses) { + byte[] addr = null; + + Iterator<RecoveryAddress> iterator = recoveryAddresses.iterator(); + while (iterator.hasNext()) { + RecoveryAddress ra = iterator.next(); + if (ra.windowId > windowId) { + break; + } + + iterator.remove(); + if (ra.address != null) { + addr = ra.address; + } + } + + if (addr != null) { + /* + * Make sure that we store the last valid address processed + */ + if (recoveryAddresses.isEmpty()) { + RecoveryAddress ra = new RecoveryAddress(); + ra.address = addr; + recoveryAddresses.add(ra); + } + + int arraySize = 1/* for the type of the message */ + + 8 /* for the location to commit */ + + 8 /* for storing the current time stamp*/; + byte[] array = new byte[arraySize]; + + array[0] = Command.COMMITTED.getOrdinal(); + System.arraycopy(addr, 0, array, 1, 8); + Server.writeLong(array, Request.TIME_OFFSET, System.currentTimeMillis()); + logger.debug("wrote {} with recoveryOffset = {}", Command.COMMITTED, Arrays.toString(addr)); + client.write(array); + } + } + } + + @Override + public Collection<Partition<AbstractFlumeInputOperator<T>>> definePartitions( + Collection<Partition<AbstractFlumeInputOperator<T>>> partitions, PartitioningContext context) + { + Collection<Service<byte[]>> discovered = discoveredFlumeSinks.get(); + if (discovered == null) { + return partitions; + } + + HashMap<String, ArrayList<RecoveryAddress>> allRecoveryAddresses = abandonedRecoveryAddresses.get(); + ArrayList<String> allConnectAddresses = new ArrayList<String>(partitions.size()); + for (Partition<AbstractFlumeInputOperator<T>> partition: partitions) { + String[] lAddresses = partition.getPartitionedInstance().connectionSpecs; + allConnectAddresses.addAll(Arrays.asList(lAddresses)); + for (int i = lAddresses.length; i-- > 0;) { + String[] parts = lAddresses[i].split(":", 2); + allRecoveryAddresses.put(parts[0], partition.getPartitionedInstance().recoveryAddresses); + } + } + + HashMap<String, String> connections = new HashMap<String, String>(discovered.size()); + for (Service<byte[]> service: discovered) { + String previousSpec = connections.get(service.getId()); + String newspec = service.getId() + ':' + service.getHost() + ':' + service.getPort(); + if (previousSpec == null) { + connections.put(service.getId(), newspec); + } else { + boolean found = false; + for (ConnectionStatus cs: partitionedInstanceStatus.get().values()) { + if (previousSpec.equals(cs.spec) && !cs.connected) { + connections.put(service.getId(), newspec); + found = true; + break; + } + } + + if (!found) { + logger.warn("2 sinks found with the same id: {} and {}... Ignoring previous.", previousSpec, newspec); + connections.put(service.getId(), newspec); + } + } + } + + for (int i = allConnectAddresses.size(); i-- > 0;) { + String[] parts = allConnectAddresses.get(i).split(":"); + String connection = connections.remove(parts[0]); + if (connection == null) { + allConnectAddresses.remove(i); + } else { + allConnectAddresses.set(i, connection); + } + } + + allConnectAddresses.addAll(connections.values()); + + partitions.clear(); + try { + if (allConnectAddresses.isEmpty()) { + /* return at least one of them; otherwise stram becomes grumpy */ + @SuppressWarnings("unchecked") + AbstractFlumeInputOperator<T> operator = getClass().newInstance(); + operator.setCodec(codec); + operator.setMaxEventsPerSecond(maxEventsPerSecond); + for (ArrayList<RecoveryAddress> lRecoveryAddresses: allRecoveryAddresses.values()) { + operator.recoveryAddresses.addAll(lRecoveryAddresses); + } + operator.connectionSpecs = new String[allConnectAddresses.size()]; + for (int i = connectionSpecs.length; i-- > 0;) { + connectionSpecs[i] = allConnectAddresses.get(i); + } + + partitions.add(new DefaultPartition<AbstractFlumeInputOperator<T>>(operator)); + } else { + long maxEventsPerSecondPerOperator = maxEventsPerSecond / allConnectAddresses.size(); + for (int i = allConnectAddresses.size(); i-- > 0;) { + @SuppressWarnings("unchecked") + AbstractFlumeInputOperator<T> operator = getClass().newInstance(); + operator.setCodec(codec); + operator.setMaxEventsPerSecond(maxEventsPerSecondPerOperator); + String connectAddress = allConnectAddresses.get(i); + operator.connectionSpecs = new String[] {connectAddress}; + + String[] parts = connectAddress.split(":", 2); + ArrayList<RecoveryAddress> remove = allRecoveryAddresses.remove(parts[0]); + if (remove != null) { + operator.recoveryAddresses.addAll(remove); + } + + partitions.add(new DefaultPartition<AbstractFlumeInputOperator<T>>(operator)); + } + } + } catch (IllegalAccessException ex) { + throw new RuntimeException(ex); + } catch (InstantiationException ex) { + throw new RuntimeException(ex); + } + + logger.debug("Requesting partitions: {}", partitions); + return partitions; + } + + @Override + public void partitioned(Map<Integer, Partition<AbstractFlumeInputOperator<T>>> partitions) + { + logger.debug("Partitioned Map: {}", partitions); + HashMap<Integer, ConnectionStatus> map = partitionedInstanceStatus.get(); + map.clear(); + for (Entry<Integer, Partition<AbstractFlumeInputOperator<T>>> entry: partitions.entrySet()) { + if (map.containsKey(entry.getKey())) { + // what can be done here? + } else { + map.put(entry.getKey(), null); + } + } + } + + @Override + public String toString() + { + return "AbstractFlumeInputOperator{" + "connected=" + connected + ", connectionSpecs=" + + (connectionSpecs.length == 0 ? "empty" : connectionSpecs[0]) + ", recoveryAddresses=" + recoveryAddresses + '}'; + } + + class Client extends AbstractLengthPrependerClient + { + private final String id; + + Client(String id) + { + this.id = id; + } + + @Override + public void onMessage(byte[] buffer, int offset, int size) + { + try { + handoverBuffer.put(new Slice(buffer, offset, size)); + } catch (InterruptedException ex) { + handleException(ex, eventloop); + } + } + + @Override + public void connected() + { + super.connected(); + + byte[] address; + synchronized (recoveryAddresses) { + if (recoveryAddresses.size() > 0) { + address = recoveryAddresses.get(recoveryAddresses.size() - 1).address; + } else { + address = new byte[8]; + } + } + + int len = 1 /* for the message type SEEK */ + + 8 /* for the address */ + + 8 /* for storing the current time stamp*/; + + byte[] array = new byte[len]; + array[0] = Command.SEEK.getOrdinal(); + System.arraycopy(address, 0, array, 1, 8); + Server.writeLong(array, 9, System.currentTimeMillis()); + write(array); + + connected = true; + ConnectionStatus connectionStatus = new ConnectionStatus(); + connectionStatus.connected = true; + connectionStatus.spec = connectionSpecs[0]; + OperatorContext ctx = context; + synchronized (ctx) { + logger.debug("{} Submitting ConnectionStatus = {}", AbstractFlumeInputOperator.this, connectionStatus); + context.setCounters(connectionStatus); + } + } + + @Override + public void disconnected() + { + connected = false; + ConnectionStatus connectionStatus = new ConnectionStatus(); + connectionStatus.connected = false; + connectionStatus.spec = connectionSpecs[0]; + OperatorContext ctx = context; + synchronized (ctx) { + logger.debug("{} Submitting ConnectionStatus = {}", AbstractFlumeInputOperator.this, connectionStatus); + context.setCounters(connectionStatus); + } + super.disconnected(); + } + + } + + public static class ZKStatsListner extends ZKAssistedDiscovery implements com.datatorrent.api.StatsListener, + Serializable + { + /* + * In the current design, one input operator is able to connect + * to only one flume adapter. Sometime in future, we should support + * any number of input operators connecting to any number of flume + * sinks and vice a versa. + * + * Until that happens the following map should be sufficient to + * keep track of which input operator is connected to which flume sink. + */ + long intervalMillis; + private final Response response; + private transient long nextMillis; + + public ZKStatsListner() + { + intervalMillis = 60 * 1000L; + response = new Response(); + } + + @Override + public Response processStats(BatchedOperatorStats stats) + { + final HashMap<Integer, ConnectionStatus> map = partitionedInstanceStatus.get(); + response.repartitionRequired = false; + + Object lastStat = null; + List<OperatorStats> lastWindowedStats = stats.getLastWindowedStats(); + for (OperatorStats os: lastWindowedStats) { + if (os.counters != null) { + lastStat = os.counters; + logger.debug("Received custom stats = {}", lastStat); + } + } + + if (lastStat instanceof ConnectionStatus) { + ConnectionStatus cs = (ConnectionStatus)lastStat; + map.put(stats.getOperatorId(), cs); + if (!cs.connected) { + logger.debug("setting repatitioned = true because of lastStat = {}", lastStat); + response.repartitionRequired = true; + } + } + + if (System.currentTimeMillis() >= nextMillis) { + logger.debug("nextMillis = {}", nextMillis); + try { + super.setup(null); + Collection<Service<byte[]>> addresses; + try { + addresses = discover(); + } finally { + super.teardown(); + } + AbstractFlumeInputOperator.discoveredFlumeSinks.set(addresses); + logger.debug("\ncurrent map = {}\ndiscovered sinks = {}", map, addresses); + switch (addresses.size()) { + case 0: + response.repartitionRequired = map.size() != 1; + break; + + default: + if (addresses.size() == map.size()) { + for (ConnectionStatus value: map.values()) { + if (value == null || !value.connected) { + response.repartitionRequired = true; + break; + } + } + } else { + response.repartitionRequired = true; + } + break; + } + } catch (Error er) { + throw er; + } catch (Throwable cause) { + logger.warn("Unable to discover services, using values from last successful discovery", cause); + } finally { + nextMillis = System.currentTimeMillis() + intervalMillis; + logger.debug("Proposed NextMillis = {}", nextMillis); + } + } + + return response; + } + + /** + * @return the intervalMillis + */ + public long getIntervalMillis() + { + return intervalMillis; + } + + /** + * @param intervalMillis the intervalMillis to set + */ + public void setIntervalMillis(long intervalMillis) + { + this.intervalMillis = intervalMillis; + } + + private static final long serialVersionUID = 201312241646L; + } + + public static class ConnectionStatus implements Serializable + { + int id; + String spec; + boolean connected; + + @Override + public int hashCode() + { + return spec.hashCode(); + } + + @Override + public boolean equals(Object obj) + { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final ConnectionStatus other = (ConnectionStatus)obj; + return spec == null ? other.spec == null : spec.equals(other.spec); + } + + @Override + public String toString() + { + return "ConnectionStatus{" + "id=" + id + ", spec=" + spec + ", connected=" + connected + '}'; + } + + private static final long serialVersionUID = 201312261615L; + } + + private static final transient ThreadLocal<HashMap<Integer, ConnectionStatus>> partitionedInstanceStatus = + new ThreadLocal<HashMap<Integer, ConnectionStatus>>() + { + @Override + protected HashMap<Integer, ConnectionStatus> initialValue() + { + return new HashMap<Integer, ConnectionStatus>(); + } + + }; + /** + * When a sink goes away and a replacement sink is not found, we stash the recovery addresses associated + * with the sink in a hope that the new sink may show up in near future. + */ + private static final transient ThreadLocal<HashMap<String, ArrayList<RecoveryAddress>>> abandonedRecoveryAddresses = + new ThreadLocal<HashMap<String, ArrayList<RecoveryAddress>>>() + { + @Override + protected HashMap<String, ArrayList<RecoveryAddress>> initialValue() + { + return new HashMap<String, ArrayList<RecoveryAddress>>(); + } + + }; + private static final transient ThreadLocal<Collection<Service<byte[]>>> discoveredFlumeSinks = + new ThreadLocal<Collection<Service<byte[]>>>(); + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof AbstractFlumeInputOperator)) { + return false; + } + + AbstractFlumeInputOperator<?> that = (AbstractFlumeInputOperator<?>)o; + + if (!Arrays.equals(connectionSpecs, that.connectionSpecs)) { + return false; + } + return recoveryAddresses.equals(that.recoveryAddresses); + + } + + @Override + public int hashCode() + { + int result = connectionSpecs != null ? Arrays.hashCode(connectionSpecs) : 0; + result = 31 * result + (recoveryAddresses.hashCode()); + return result; + } + + public void setMaxEventsPerSecond(long maxEventsPerSecond) + { + this.maxEventsPerSecond = maxEventsPerSecond; + } + + public long getMaxEventsPerSecond() + { + return maxEventsPerSecond; + } + + private static final Logger logger = LoggerFactory.getLogger(AbstractFlumeInputOperator.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/main/java/com/datatorrent/flume/sink/DTFlumeSink.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/com/datatorrent/flume/sink/DTFlumeSink.java b/flume/src/main/java/com/datatorrent/flume/sink/DTFlumeSink.java new file mode 100644 index 0000000..35d0c5f --- /dev/null +++ b/flume/src/main/java/com/datatorrent/flume/sink/DTFlumeSink.java @@ -0,0 +1,571 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.sink; + +import java.io.IOError; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.ServiceConfigurationError; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.Transaction; +import org.apache.flume.conf.Configurable; +import org.apache.flume.sink.AbstractSink; + +import com.datatorrent.api.Component; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.flume.discovery.Discovery; +import com.datatorrent.flume.sink.Server.Client; +import com.datatorrent.flume.sink.Server.Request; +import com.datatorrent.flume.storage.EventCodec; +import com.datatorrent.flume.storage.Storage; +import com.datatorrent.netlet.DefaultEventLoop; +import com.datatorrent.netlet.NetletThrowable; +import com.datatorrent.netlet.NetletThrowable.NetletRuntimeException; +import com.datatorrent.netlet.util.Slice; + +/** + * DTFlumeSink is a flume sink developed to ingest the data into DataTorrent DAG + * from flume. It's essentially a flume sink which acts as a server capable of + * talking to one client at a time. The client for this server is AbstractFlumeInputOperator. + * <p /> + * <experimental>DTFlumeSink auto adjusts the rate at which it consumes the data from channel to + * match the throughput of the DAG.</experimental> + * <p /> + * The properties you can set on the DTFlumeSink are: <br /> + * id - string unique value identifying this sink <br /> + * hostname - string value indicating the fqdn or ip address of the interface on which the server should listen <br /> + * port - integer value indicating the numeric port to which the server should bind <br /> + * sleepMillis - integer value indicating the number of milliseconds the process should sleep when there are no events + * before checking for next event again <br /> + * throughputAdjustmentPercent - integer value indicating by what percentage the flume transaction size should be + * adjusted upward or downward at a time <br /> + * minimumEventsPerTransaction - integer value indicating the minimum number of events per transaction <br /> + * maximumEventsPerTransaction - integer value indicating the maximum number of events per transaction. This value can + * not be more than channel's transaction capacity.<br /> + * + * @author Chetan Narsude <[email protected]> + * @since 0.9.2 + */ +public class DTFlumeSink extends AbstractSink implements Configurable +{ + private static final String HOSTNAME_STRING = "hostname"; + private static final String HOSTNAME_DEFAULT = "locahost"; + private static final long ACCEPTED_TOLERANCE = 20000; + private DefaultEventLoop eventloop; + private Server server; + private int outstandingEventsCount; + private int lastConsumedEventsCount; + private int idleCount; + private byte[] playback; + private Client client; + private String hostname; + private int port; + private String id; + private long acceptedTolerance; + private long sleepMillis; + private double throughputAdjustmentFactor; + private int minimumEventsPerTransaction; + private int maximumEventsPerTransaction; + private long commitEventTimeoutMillis; + private transient long lastCommitEventTimeMillis; + private Storage storage; + Discovery<byte[]> discovery; + StreamCodec<Event> codec; + /* Begin implementing Flume Sink interface */ + + @Override + @SuppressWarnings({"BroadCatchBlock", "TooBroadCatch", "UseSpecificCatch", "SleepWhileInLoop"}) + public Status process() throws EventDeliveryException + { + Slice slice; + synchronized (server.requests) { + for (Request r : server.requests) { + logger.debug("found {}", r); + switch (r.type) { + case SEEK: + lastCommitEventTimeMillis = System.currentTimeMillis(); + slice = r.getAddress(); + playback = storage.retrieve(Arrays.copyOfRange(slice.buffer, slice.offset, slice.offset + slice.length)); + client = r.client; + break; + + case COMMITTED: + lastCommitEventTimeMillis = System.currentTimeMillis(); + slice = r.getAddress(); + storage.clean(Arrays.copyOfRange(slice.buffer, slice.offset, slice.offset + slice.length)); + break; + + case CONNECTED: + logger.debug("Connected received, ignoring it!"); + break; + + case DISCONNECTED: + if (r.client == client) { + client = null; + outstandingEventsCount = 0; + } + break; + + case WINDOWED: + lastConsumedEventsCount = r.getEventCount(); + idleCount = r.getIdleCount(); + outstandingEventsCount -= lastConsumedEventsCount; + break; + + case SERVER_ERROR: + throw new IOError(null); + + default: + logger.debug("Cannot understand the request {}", r); + break; + } + } + + server.requests.clear(); + } + + if (client == null) { + logger.info("No client expressed interest yet to consume the events."); + return Status.BACKOFF; + } else if (System.currentTimeMillis() - lastCommitEventTimeMillis > commitEventTimeoutMillis) { + logger.info("Client has not processed the workload given for the last {} milliseconds, so backing off.", + System.currentTimeMillis() - lastCommitEventTimeMillis); + return Status.BACKOFF; + } + + int maxTuples; + // the following logic needs to be fixed... this is a quick put together. + if (outstandingEventsCount < 0) { + if (idleCount > 1) { + maxTuples = (int)((1 + throughputAdjustmentFactor * idleCount) * lastConsumedEventsCount); + } else { + maxTuples = (int)((1 + throughputAdjustmentFactor) * lastConsumedEventsCount); + } + } else if (outstandingEventsCount > lastConsumedEventsCount) { + maxTuples = (int)((1 - throughputAdjustmentFactor) * lastConsumedEventsCount); + } else { + if (idleCount > 0) { + maxTuples = (int)((1 + throughputAdjustmentFactor * idleCount) * lastConsumedEventsCount); + if (maxTuples <= 0) { + maxTuples = minimumEventsPerTransaction; + } + } else { + maxTuples = lastConsumedEventsCount; + } + } + + if (maxTuples >= maximumEventsPerTransaction) { + maxTuples = maximumEventsPerTransaction; + } else if (maxTuples <= 0) { + maxTuples = minimumEventsPerTransaction; + } + + if (maxTuples > 0) { + if (playback != null) { + try { + int i = 0; + do { + if (!client.write(playback)) { + retryWrite(playback, null); + } + outstandingEventsCount++; + playback = storage.retrieveNext(); + } + while (++i < maxTuples && playback != null); + } catch (Exception ex) { + logger.warn("Playback Failed", ex); + if (ex instanceof NetletThrowable) { + try { + eventloop.disconnect(client); + } finally { + client = null; + outstandingEventsCount = 0; + } + } + return Status.BACKOFF; + } + } else { + int storedTuples = 0; + + Transaction t = getChannel().getTransaction(); + try { + t.begin(); + + Event e; + while (storedTuples < maxTuples && (e = getChannel().take()) != null) { + Slice event = codec.toByteArray(e); + byte[] address = storage.store(event); + if (address != null) { + if (!client.write(address, event)) { + retryWrite(address, event); + } + outstandingEventsCount++; + } else { + logger.debug("Detected the condition of recovery from flume crash!"); + } + storedTuples++; + } + + if (storedTuples > 0) { + storage.flush(); + } + + t.commit(); + + if (storedTuples > 0) { /* log less frequently */ + logger.debug("Transaction details maxTuples = {}, storedTuples = {}, outstanding = {}", + maxTuples, storedTuples, outstandingEventsCount); + } + } catch (Error er) { + t.rollback(); + throw er; + } catch (Exception ex) { + logger.error("Transaction Failed", ex); + if (ex instanceof NetletRuntimeException && client != null) { + try { + eventloop.disconnect(client); + } finally { + client = null; + outstandingEventsCount = 0; + } + } + t.rollback(); + return Status.BACKOFF; + } finally { + t.close(); + } + + if (storedTuples == 0) { + sleep(); + } + } + } + + return Status.READY; + } + + private void sleep() + { + try { + Thread.sleep(sleepMillis); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void start() + { + try { + if (storage instanceof Component) { + @SuppressWarnings("unchecked") + Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)storage; + component.setup(null); + } + if (discovery instanceof Component) { + @SuppressWarnings("unchecked") + Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)discovery; + component.setup(null); + } + if (codec instanceof Component) { + @SuppressWarnings("unchecked") + Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)codec; + component.setup(null); + } + eventloop = new DefaultEventLoop("EventLoop-" + id); + server = new Server(id, discovery,acceptedTolerance); + } catch (Error error) { + throw error; + } catch (RuntimeException re) { + throw re; + } catch (IOException ex) { + throw new RuntimeException(ex); + } + + eventloop.start(); + eventloop.start(hostname, port, server); + super.start(); + } + + @Override + public void stop() + { + try { + super.stop(); + } finally { + try { + if (client != null) { + eventloop.disconnect(client); + client = null; + } + + eventloop.stop(server); + eventloop.stop(); + + if (codec instanceof Component) { + @SuppressWarnings("unchecked") + Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)codec; + component.teardown(); + } + if (discovery instanceof Component) { + @SuppressWarnings("unchecked") + Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)discovery; + component.teardown(); + } + if (storage instanceof Component) { + @SuppressWarnings("unchecked") + Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)storage; + component.teardown(); + } + } catch (Throwable cause) { + throw new ServiceConfigurationError("Failed Stop", cause); + } + } + } + + /* End implementing Flume Sink interface */ + + /* Begin Configurable Interface */ + @Override + public void configure(Context context) + { + hostname = context.getString(HOSTNAME_STRING, HOSTNAME_DEFAULT); + port = context.getInteger("port", 0); + id = context.getString("id"); + if (id == null) { + id = getName(); + } + acceptedTolerance = context.getLong("acceptedTolerance", ACCEPTED_TOLERANCE); + sleepMillis = context.getLong("sleepMillis", 5L); + throughputAdjustmentFactor = context.getInteger("throughputAdjustmentPercent", 5) / 100.0; + maximumEventsPerTransaction = context.getInteger("maximumEventsPerTransaction", 10000); + minimumEventsPerTransaction = context.getInteger("minimumEventsPerTransaction", 100); + commitEventTimeoutMillis = context.getLong("commitEventTimeoutMillis", Long.MAX_VALUE); + + @SuppressWarnings("unchecked") + Discovery<byte[]> ldiscovery = configure("discovery", Discovery.class, context); + if (ldiscovery == null) { + logger.warn("Discovery agent not configured for the sink!"); + discovery = new Discovery<byte[]>() + { + @Override + public void unadvertise(Service<byte[]> service) + { + logger.debug("Sink {} stopped listening on {}:{}", service.getId(), service.getHost(), service.getPort()); + } + + @Override + public void advertise(Service<byte[]> service) + { + logger.debug("Sink {} started listening on {}:{}", service.getId(), service.getHost(), service.getPort()); + } + + @Override + @SuppressWarnings("unchecked") + public Collection<Service<byte[]>> discover() + { + return Collections.EMPTY_SET; + } + + }; + } else { + discovery = ldiscovery; + } + + storage = configure("storage", Storage.class, context); + if (storage == null) { + logger.warn("storage key missing... DTFlumeSink may lose data!"); + storage = new Storage() + { + @Override + public byte[] store(Slice slice) + { + return null; + } + + @Override + public byte[] retrieve(byte[] identifier) + { + return null; + } + + @Override + public byte[] retrieveNext() + { + return null; + } + + @Override + public void clean(byte[] identifier) + { + } + + @Override + public void flush() + { + } + + }; + } + + @SuppressWarnings("unchecked") + StreamCodec<Event> lCodec = configure("codec", StreamCodec.class, context); + if (lCodec == null) { + codec = new EventCodec(); + } else { + codec = lCodec; + } + + } + + /* End Configurable Interface */ + + @SuppressWarnings({"UseSpecificCatch", "BroadCatchBlock", "TooBroadCatch"}) + private static <T> T configure(String key, Class<T> clazz, Context context) + { + String classname = context.getString(key); + if (classname == null) { + return null; + } + + try { + Class<?> loadClass = Thread.currentThread().getContextClassLoader().loadClass(classname); + if (clazz.isAssignableFrom(loadClass)) { + @SuppressWarnings("unchecked") + T object = (T)loadClass.newInstance(); + if (object instanceof Configurable) { + Context context1 = new Context(context.getSubProperties(key + '.')); + String id = context1.getString(Storage.ID); + if (id == null) { + id = context.getString(Storage.ID); + logger.debug("{} inherited id={} from sink", key, id); + context1.put(Storage.ID, id); + } + ((Configurable)object).configure(context1); + } + + return object; + } else { + logger.error("key class {} does not implement {} interface", classname, Storage.class.getCanonicalName()); + throw new Error("Invalid storage " + classname); + } + } catch (Error error) { + throw error; + } catch (RuntimeException re) { + throw re; + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + + /** + * @return the hostname + */ + String getHostname() + { + return hostname; + } + + /** + * @param hostname the hostname to set + */ + void setHostname(String hostname) + { + this.hostname = hostname; + } + + /** + * @return the port + */ + int getPort() + { + return port; + } + + public long getAcceptedTolerance() + { + return acceptedTolerance; + } + + public void setAcceptedTolerance(long acceptedTolerance) + { + this.acceptedTolerance = acceptedTolerance; + } + + /** + * @param port the port to set + */ + void setPort(int port) + { + this.port = port; + } + + /** + * @return the discovery + */ + Discovery<byte[]> getDiscovery() + { + return discovery; + } + + /** + * @param discovery the discovery to set + */ + void setDiscovery(Discovery<byte[]> discovery) + { + this.discovery = discovery; + } + + /** + * Attempt the sequence of writing after sleeping twice and upon failure assume + * that the client connection has problems and hence close it. + * + * @param address + * @param e + * @throws IOException + */ + private void retryWrite(byte[] address, Slice event) throws IOException + { + if (event == null) { /* this happens for playback where address and event are sent as single object */ + while (client.isConnected()) { + sleep(); + if (client.write(address)) { + return; + } + } + } else { /* this happens when the events are taken from the flume channel and writing first time failed */ + while (client.isConnected()) { + sleep(); + if (client.write(address, event)) { + return; + } + } + } + + throw new IOException("Client disconnected!"); + } + + private static final Logger logger = LoggerFactory.getLogger(DTFlumeSink.class); +}
