http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/TransactionIdOutOfOrderException.java ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/TransactionIdOutOfOrderException.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/TransactionIdOutOfOrderException.java index e16fcab..50c151f 100644 --- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/TransactionIdOutOfOrderException.java +++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/TransactionIdOutOfOrderException.java @@ -17,8 +17,6 @@ */ package org.apache.distributedlog.exceptions; -import org.apache.distributedlog.thrift.service.StatusCode; - /** * An exception is thrown when a log writer attempts to write a record with out-of-order transaction id. */
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/UnexpectedException.java ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/UnexpectedException.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/UnexpectedException.java index 637886e..50c21aa 100644 --- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/UnexpectedException.java +++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/UnexpectedException.java @@ -17,8 +17,6 @@ */ package org.apache.distributedlog.exceptions; -import org.apache.distributedlog.thrift.service.StatusCode; - /** * An {@code UnexpectedException} is thrown when encountering unexpected conditions. */ http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/UnsupportedMetadataVersionException.java ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/UnsupportedMetadataVersionException.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/UnsupportedMetadataVersionException.java index 01fab89..7eb6ed5 100644 --- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/UnsupportedMetadataVersionException.java +++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/UnsupportedMetadataVersionException.java @@ -17,8 +17,6 @@ */ package org.apache.distributedlog.exceptions; -import org.apache.distributedlog.thrift.service.StatusCode; - /** * Exception is thrown when found unsupported metadata version. */ http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/WriteCancelledException.java ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/WriteCancelledException.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/WriteCancelledException.java index d9001dd..8db1d50 100644 --- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/WriteCancelledException.java +++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/WriteCancelledException.java @@ -17,8 +17,6 @@ */ package org.apache.distributedlog.exceptions; -import org.apache.distributedlog.thrift.service.StatusCode; - /** * Signals when a write request is cancelled. */ http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java index 6899dbf..1d9c2a9 100644 --- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java +++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java @@ -17,8 +17,6 @@ */ package org.apache.distributedlog.exceptions; -import org.apache.distributedlog.thrift.service.StatusCode; - /** * An exception on writing log records. */ http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/ProtocolUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/ProtocolUtils.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/ProtocolUtils.java deleted file mode 100644 index 0a37d3a..0000000 --- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/ProtocolUtils.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.util; - -import static com.google.common.base.Charsets.UTF_8; - -import org.apache.distributedlog.DLSN; -import java.util.zip.CRC32; - -/** - * With CRC embedded in the application, we have to keep track of per api crc. Ideally this - * would be done by thrift. - */ -public class ProtocolUtils { - - // For request payload checksum - private static final ThreadLocal<CRC32> requestCRC = new ThreadLocal<CRC32>() { - @Override - protected CRC32 initialValue() { - return new CRC32(); - } - }; - - /** - * Generate crc32 for WriteOp. - */ - public static Long writeOpCRC32(String stream, byte[] payload) { - CRC32 crc = requestCRC.get(); - try { - crc.update(stream.getBytes(UTF_8)); - crc.update(payload); - return crc.getValue(); - } finally { - crc.reset(); - } - } - - /** - * Generate crc32 for TruncateOp. - */ - public static Long truncateOpCRC32(String stream, DLSN dlsn) { - CRC32 crc = requestCRC.get(); - try { - crc.update(stream.getBytes(UTF_8)); - crc.update(dlsn.serializeBytes()); - return crc.getValue(); - } finally { - crc.reset(); - } - } - - /** - * Generate crc32 for any op which only passes a stream name. - */ - public static Long streamOpCRC32(String stream) { - CRC32 crc = requestCRC.get(); - try { - crc.update(stream.getBytes(UTF_8)); - return crc.getValue(); - } finally { - crc.reset(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-protocol/src/main/resources/findbugsExclude.xml ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/main/resources/findbugsExclude.xml b/distributedlog-protocol/src/main/resources/findbugsExclude.xml index 55e50f6..5e1cd0e 100644 --- a/distributedlog-protocol/src/main/resources/findbugsExclude.xml +++ b/distributedlog-protocol/src/main/resources/findbugsExclude.xml @@ -17,10 +17,6 @@ //--> <FindBugsFilter> <Match> - <!-- generated code, we can't be held responsible for findbugs in it //--> - <Class name="~org\.apache\.distributedlog\.thrift.*" /> - </Match> - <Match> <!-- it is safe to store external bytes reference here. //--> <Class name="org.apache.distributedlog.LogRecord" /> <Bug pattern="EI_EXPOSE_REP2" /> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-protocol/src/main/thrift/service.thrift ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/main/thrift/service.thrift b/distributedlog-protocol/src/main/thrift/service.thrift deleted file mode 100644 index 45e1449..0000000 --- a/distributedlog-protocol/src/main/thrift/service.thrift +++ /dev/null @@ -1,203 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -namespace java org.apache.distributedlog.thrift.service - -/* Response stats codes */ -enum StatusCode { - /* 2xx: action requested by the client was received, understood, accepted and processed successfully. */ - - /* standard response for successful requests. */ - SUCCESS = 200, - - /* 3xx: client must take additional action to complete the request. */ - - /* client closed. */ - CLIENT_CLOSED = 301, - /* found the stream in a different server, a redirection is required by client. */ - FOUND = 302, - - /* 4xx: client seems to have erred. */ - - /* request is denied for some reason */ - REQUEST_DENIED = 403, - /* request record too large */ - TOO_LARGE_RECORD = 413, - - /* 5xx: server failed to fulfill an apparently valid request. */ - - /* Generic error message, given when no more specific message is suitable. */ - INTERNAL_SERVER_ERROR = 500, - /* Not implemented */ - NOT_IMPLEMENTED = 501, - /* Already Closed Exception */ - ALREADY_CLOSED = 502, - /* Service is currently unavailable (because it is overloaded or down for maintenance). */ - SERVICE_UNAVAILABLE = 503, - /* Locking exception */ - LOCKING_EXCEPTION = 504, - /* ZooKeeper Errors */ - ZOOKEEPER_ERROR = 505, - /* Metadata exception */ - METADATA_EXCEPTION = 506, - /* BK Transmit Error */ - BK_TRANSMIT_ERROR = 507, - /* Flush timeout */ - FLUSH_TIMEOUT = 508, - /* Log empty */ - LOG_EMPTY = 509, - /* Log not found */ - LOG_NOT_FOUND = 510, - /* Truncated Transactions */ - TRUNCATED_TRANSACTION = 511, - /* End of Stream */ - END_OF_STREAM = 512, - /* Transaction Id Out of Order */ - TRANSACTION_OUT_OF_ORDER = 513, - /* Write exception */ - WRITE_EXCEPTION = 514, - /* Stream Unavailable */ - STREAM_UNAVAILABLE = 515, - /* Write cancelled exception */ - WRITE_CANCELLED_EXCEPTION = 516, - /* over-capacity/backpressure */ - OVER_CAPACITY = 517, - /** stream exists but is not ready (recovering etc.). - the difference between NOT_READY and UNAVAILABLE is that UNAVAILABLE - indicates the stream is no longer owned by the proxy and we should - redirect. NOT_READY indicates the stream exist at the proxy but isn't - eady for writes. */ - STREAM_NOT_READY = 518, - /* Region Unavailable */ - REGION_UNAVAILABLE = 519, - /* Invalid Enveloped Entry */ - INVALID_ENVELOPED_ENTRY = 520, - /* Unsupported metadata version */ - UNSUPPORTED_METADATA_VERSION = 521, - /* Log Already Exists */ - LOG_EXISTS = 522, - /* Checksum failed on the request */ - CHECKSUM_FAILED = 523, - /* Overcapacity: too many streams */ - TOO_MANY_STREAMS = 524, - /* Log Segment Not Found */ - LOG_SEGMENT_NOT_FOUND = 525, - /* End of Log Segment */ - END_OF_LOG_SEGMENT = 526, - /* Log Segment Is Truncated */ - LOG_SEGMENT_IS_TRUNCATED = 527, - - /* 6xx: unexpected */ - - UNEXPECTED = 600, - INTERRUPTED = 601, - INVALID_STREAM_NAME = 602, - ILLEGAL_STATE = 603, - - /* 10xx: reader exceptions */ - - RETRYABLE_READ = 1000, - LOG_READ_ERROR = 1001, - /* Read cancelled exception */ - READ_CANCELLED_EXCEPTION = 1002, -} - -/* Response Header */ -struct ResponseHeader { - 1: required StatusCode code; - 2: optional string errMsg; - 3: optional string location; -} - -/* Write Response */ -struct WriteResponse { - 1: required ResponseHeader header; - 2: optional string dlsn; -} - -/* Bulk write response */ -struct BulkWriteResponse { - 1: required ResponseHeader header; - 2: optional list<WriteResponse> writeResponses; -} - -/* Write Context */ -struct WriteContext { - 1: optional set<string> triedHosts; - 2: optional i64 crc32; - 3: optional bool isRecordSet; -} - -/* HeartBeat Options */ -struct HeartbeatOptions { - 1: optional bool sendHeartBeatToReader; -} - -/* Server Status */ -enum ServerStatus { - /* service is writing and accepting new streams */ - WRITE_AND_ACCEPT = 100, - /* service is only writing to old streams, not accepting new streams */ - WRITE_ONLY = 200, - /* service is shutting down, will not write */ - DOWN = 300, -} - -/* Server Info */ -struct ServerInfo { - 1: optional map<string, string> ownerships; - 2: optional ServerStatus serverStatus; -} - -/* Client Info */ -struct ClientInfo { - 1: optional string streamNameRegex; - 2: optional bool getOwnerships; -} - -service DistributedLogService { - - /* Deprecated */ - ServerInfo handshake(); - - ServerInfo handshakeWithClientInfo(ClientInfo clientInfo); - - /* Deprecated */ - WriteResponse heartbeat(string stream, WriteContext ctx); - - WriteResponse heartbeatWithOptions(string stream, WriteContext ctx, HeartbeatOptions options); - - /* Deprecated */ - WriteResponse write(string stream, binary data); - - WriteResponse writeWithContext(string stream, binary data, WriteContext ctx); - - BulkWriteResponse writeBulkWithContext(string stream, list<binary> data, WriteContext ctx); - - WriteResponse truncate(string stream, string dlsn, WriteContext ctx); - - WriteResponse release(string stream, WriteContext ctx); - - WriteResponse create(string stream, WriteContext ctx); - - WriteResponse delete(string stream, WriteContext ctx); - - WriteResponse getOwner(string stream, WriteContext ctx); - - /* Admin Methods */ - void setAcceptNewStream(bool enabled); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/pom.xml ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/pom.xml b/distributedlog-proxy-client/pom.xml new file mode 100644 index 0000000..7392d90 --- /dev/null +++ b/distributedlog-proxy-client/pom.xml @@ -0,0 +1,172 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.distributedlog</groupId> + <artifactId>distributedlog</artifactId> + <version>0.5.0-incubating-SNAPSHOT</version> + </parent> + <artifactId>distributedlog-proxy-client</artifactId> + <name>Apache DistributedLog :: Proxy Client</name> + <dependencies> + <dependency> + <groupId>org.apache.distributedlog</groupId> + <artifactId>distributedlog-proxy-protocol</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>finagle-core_2.11</artifactId> + <version>${finagle.version}</version> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>finagle-thriftmux_2.11</artifactId> + <version>${finagle.version}</version> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>finagle-serversets_2.11</artifactId> + <version>${finagle.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>${commons-lang3.version}</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.distributedlog</groupId> + <artifactId>distributedlog-protocol</artifactId> + <version>${project.parent.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <configuration> + <excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile> + </configuration> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>${maven-compiler-plugin.version}</version> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>${maven-jar-plugin.version}</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>${maven-surefire-plugin.version}</version> + <configuration> + <redirectTestOutputToFile>true</redirectTestOutputToFile> + <argLine>-Xmx3G -Djava.net.preferIPv4Stack=true -XX:MaxDirectMemorySize=2G</argLine> + <forkMode>always</forkMode> + <forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds> + <properties> + <property> + <name>listener</name> + <value>org.apache.distributedlog.TimedOutTestsListener</value> + </property> + </properties> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>${maven-checkstyle-plugin.version}</version> + <dependencies> + <dependency> + <groupId>com.puppycrawl.tools</groupId> + <artifactId>checkstyle</artifactId> + <version>${puppycrawl.checkstyle.version}</version> + </dependency> + <dependency> + <groupId>org.apache.distributedlog</groupId> + <artifactId>distributedlog-build-tools</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + <configuration> + <configLocation>distributedlog/checkstyle.xml</configLocation> + <suppressionsLocation>distributedlog/suppressions.xml</suppressionsLocation> + <consoleOutput>true</consoleOutput> + <failOnViolation>true</failOnViolation> + <includeResources>false</includeResources> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + </configuration> + <executions> + <execution> + <phase>test-compile</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/ClientConfig.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/ClientConfig.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/ClientConfig.java new file mode 100644 index 0000000..57e2b5a --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/ClientConfig.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.concurrent.TimeUnit; + +/** + * Client Config. + */ +public class ClientConfig { + int redirectBackoffStartMs = 25; + int redirectBackoffMaxMs = 100; + int maxRedirects = -1; + int requestTimeoutMs = -1; + boolean thriftmux = false; + boolean streamFailfast = false; + String streamNameRegex = ".*"; + boolean handshakeWithClientInfo = true; + long periodicHandshakeIntervalMs = TimeUnit.MINUTES.toMillis(5); + long periodicOwnershipSyncIntervalMs = TimeUnit.MINUTES.toMillis(5); + boolean periodicDumpOwnershipCacheEnabled = false; + long periodicDumpOwnershipCacheIntervalMs = TimeUnit.MINUTES.toMillis(10); + boolean enableHandshakeTracing = false; + boolean enableChecksum = true; + + public ClientConfig setMaxRedirects(int maxRedirects) { + this.maxRedirects = maxRedirects; + return this; + } + + public int getMaxRedirects() { + return this.maxRedirects; + } + + public ClientConfig setRequestTimeoutMs(int timeoutInMillis) { + this.requestTimeoutMs = timeoutInMillis; + return this; + } + + public int getRequestTimeoutMs() { + return this.requestTimeoutMs; + } + + public ClientConfig setRedirectBackoffStartMs(int ms) { + this.redirectBackoffStartMs = ms; + return this; + } + + public int getRedirectBackoffStartMs() { + return this.redirectBackoffStartMs; + } + + public ClientConfig setRedirectBackoffMaxMs(int ms) { + this.redirectBackoffMaxMs = ms; + return this; + } + + public int getRedirectBackoffMaxMs() { + return this.redirectBackoffMaxMs; + } + + public ClientConfig setThriftMux(boolean enabled) { + this.thriftmux = enabled; + return this; + } + + public boolean getThriftMux() { + return this.thriftmux; + } + + public ClientConfig setStreamFailfast(boolean enabled) { + this.streamFailfast = enabled; + return this; + } + + public boolean getStreamFailfast() { + return this.streamFailfast; + } + + public ClientConfig setStreamNameRegex(String nameRegex) { + checkNotNull(nameRegex); + this.streamNameRegex = nameRegex; + return this; + } + + public String getStreamNameRegex() { + return this.streamNameRegex; + } + + public ClientConfig setHandshakeWithClientInfo(boolean enabled) { + this.handshakeWithClientInfo = enabled; + return this; + } + + public boolean getHandshakeWithClientInfo() { + return this.handshakeWithClientInfo; + } + + public ClientConfig setPeriodicHandshakeIntervalMs(long intervalMs) { + this.periodicHandshakeIntervalMs = intervalMs; + return this; + } + + public long getPeriodicHandshakeIntervalMs() { + return this.periodicHandshakeIntervalMs; + } + + public ClientConfig setPeriodicOwnershipSyncIntervalMs(long intervalMs) { + this.periodicOwnershipSyncIntervalMs = intervalMs; + return this; + } + + public long getPeriodicOwnershipSyncIntervalMs() { + return this.periodicOwnershipSyncIntervalMs; + } + + public ClientConfig setPeriodicDumpOwnershipCacheEnabled(boolean enabled) { + this.periodicDumpOwnershipCacheEnabled = enabled; + return this; + } + + public boolean isPeriodicDumpOwnershipCacheEnabled() { + return this.periodicDumpOwnershipCacheEnabled; + } + + public ClientConfig setPeriodicDumpOwnershipCacheIntervalMs(long intervalMs) { + this.periodicDumpOwnershipCacheIntervalMs = intervalMs; + return this; + } + + public long getPeriodicDumpOwnershipCacheIntervalMs() { + return this.periodicDumpOwnershipCacheIntervalMs; + } + + public ClientConfig setHandshakeTracingEnabled(boolean enabled) { + this.enableHandshakeTracing = enabled; + return this; + } + + public boolean isHandshakeTracingEnabled() { + return this.enableHandshakeTracing; + } + + public ClientConfig setChecksumEnabled(boolean enabled) { + this.enableChecksum = enabled; + return this; + } + + public boolean isChecksumEnabled() { + return this.enableChecksum; + } + + public static ClientConfig newConfig(ClientConfig config) { + ClientConfig newConfig = new ClientConfig(); + newConfig.setMaxRedirects(config.getMaxRedirects()) + .setRequestTimeoutMs(config.getRequestTimeoutMs()) + .setRedirectBackoffStartMs(config.getRedirectBackoffStartMs()) + .setRedirectBackoffMaxMs(config.getRedirectBackoffMaxMs()) + .setThriftMux(config.getThriftMux()) + .setStreamFailfast(config.getStreamFailfast()) + .setStreamNameRegex(config.getStreamNameRegex()) + .setHandshakeWithClientInfo(config.getHandshakeWithClientInfo()) + .setPeriodicHandshakeIntervalMs(config.getPeriodicHandshakeIntervalMs()) + .setPeriodicDumpOwnershipCacheEnabled(config.isPeriodicDumpOwnershipCacheEnabled()) + .setPeriodicDumpOwnershipCacheIntervalMs(config.getPeriodicDumpOwnershipCacheIntervalMs()) + .setHandshakeTracingEnabled(config.isHandshakeTracingEnabled()) + .setChecksumEnabled(config.isChecksumEnabled()); + return newConfig; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java new file mode 100644 index 0000000..0ed93d0 --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java @@ -0,0 +1,1199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.LogRecordSetBuffer; +import org.apache.distributedlog.client.monitor.MonitorServiceClient; +import org.apache.distributedlog.client.ownership.OwnershipCache; +import org.apache.distributedlog.client.proxy.ClusterClient; +import org.apache.distributedlog.client.proxy.HostProvider; +import org.apache.distributedlog.client.proxy.ProxyClient; +import org.apache.distributedlog.client.proxy.ProxyClientManager; +import org.apache.distributedlog.client.proxy.ProxyListener; +import org.apache.distributedlog.client.resolver.RegionResolver; +import org.apache.distributedlog.client.routing.RoutingService; +import org.apache.distributedlog.client.routing.RoutingService.RoutingContext; +import org.apache.distributedlog.client.stats.ClientStats; +import org.apache.distributedlog.client.stats.OpStats; +import org.apache.distributedlog.exceptions.DLClientClosedException; +import org.apache.distributedlog.exceptions.ServiceUnavailableException; +import org.apache.distributedlog.exceptions.StreamUnavailableException; +import org.apache.distributedlog.protocol.util.ProtocolUtils; +import org.apache.distributedlog.service.DLSocketAddress; +import org.apache.distributedlog.service.DistributedLogClient; +import org.apache.distributedlog.thrift.service.BulkWriteResponse; +import org.apache.distributedlog.thrift.service.HeartbeatOptions; +import org.apache.distributedlog.thrift.service.ResponseHeader; +import org.apache.distributedlog.thrift.service.ServerInfo; +import org.apache.distributedlog.thrift.service.ServerStatus; +import org.apache.distributedlog.thrift.service.StatusCode; +import org.apache.distributedlog.thrift.service.WriteContext; +import org.apache.distributedlog.thrift.service.WriteResponse; +import com.twitter.finagle.CancelledRequestException; +import com.twitter.finagle.ConnectionFailedException; +import com.twitter.finagle.Failure; +import com.twitter.finagle.NoBrokersAvailableException; +import com.twitter.finagle.RequestTimeoutException; +import com.twitter.finagle.ServiceException; +import com.twitter.finagle.ServiceTimeoutException; +import com.twitter.finagle.WriteException; +import com.twitter.finagle.builder.ClientBuilder; +import com.twitter.finagle.stats.StatsReceiver; +import com.twitter.finagle.thrift.ClientId; +import com.twitter.util.Duration; +import com.twitter.util.Function; +import com.twitter.util.Function0; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Promise; +import com.twitter.util.Return; +import com.twitter.util.Throw; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.thrift.TApplicationException; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.Timeout; +import org.jboss.netty.util.TimerTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.Seq; +import scala.runtime.AbstractFunction1; + + +/** + * Implementation of distributedlog client. + */ +public class DistributedLogClientImpl implements DistributedLogClient, MonitorServiceClient, + RoutingService.RoutingListener, ProxyListener, HostProvider { + + private static final Logger logger = LoggerFactory.getLogger(DistributedLogClientImpl.class); + + private final String clientName; + private final ClientId clientId; + private final ClientConfig clientConfig; + private final RoutingService routingService; + private final ProxyClient.Builder clientBuilder; + private final boolean streamFailfast; + private final Pattern streamNameRegexPattern; + + // Timer + private final HashedWheelTimer dlTimer; + + // region resolver + private final RegionResolver regionResolver; + + // Ownership maintenance + private final OwnershipCache ownershipCache; + // Channel/Client management + private final ProxyClientManager clientManager; + // Cluster Client (for routing service) + private final Optional<ClusterClient> clusterClient; + + // Close Status + private boolean closed = false; + private final ReentrantReadWriteLock closeLock = + new ReentrantReadWriteLock(); + + abstract class StreamOp implements TimerTask { + final String stream; + + final AtomicInteger tries = new AtomicInteger(0); + final RoutingContext routingContext = RoutingContext.of(regionResolver); + final WriteContext ctx = new WriteContext(); + final Stopwatch stopwatch; + final OpStats opStats; + SocketAddress nextAddressToSend; + + StreamOp(final String stream, final OpStats opStats) { + this.stream = stream; + this.stopwatch = Stopwatch.createStarted(); + this.opStats = opStats; + } + + boolean shouldTimeout() { + long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS); + return shouldTimeout(elapsedMs); + } + + boolean shouldTimeout(long elapsedMs) { + return clientConfig.getRequestTimeoutMs() > 0 + && elapsedMs >= clientConfig.getRequestTimeoutMs(); + } + + void send(SocketAddress address) { + long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS); + if (clientConfig.getMaxRedirects() > 0 + && tries.get() >= clientConfig.getMaxRedirects()) { + fail(address, new RequestTimeoutException(Duration.fromMilliseconds(elapsedMs), + "Exhausted max redirects in " + elapsedMs + " ms")); + return; + } else if (shouldTimeout(elapsedMs)) { + fail(address, new RequestTimeoutException(Duration.fromMilliseconds(elapsedMs), + "Exhausted max request timeout " + clientConfig.getRequestTimeoutMs() + + " in " + elapsedMs + " ms")); + return; + } + synchronized (this) { + String addrStr = address.toString(); + if (ctx.isSetTriedHosts() && ctx.getTriedHosts().contains(addrStr)) { + nextAddressToSend = address; + dlTimer.newTimeout(this, + Math.min(clientConfig.getRedirectBackoffMaxMs(), + tries.get() * clientConfig.getRedirectBackoffStartMs()), + TimeUnit.MILLISECONDS); + } else { + doSend(address); + } + } + } + + abstract Future<ResponseHeader> sendRequest(ProxyClient sc); + + void doSend(SocketAddress address) { + ctx.addToTriedHosts(address.toString()); + if (clientConfig.isChecksumEnabled()) { + Long crc32 = computeChecksum(); + if (null != crc32) { + ctx.setCrc32(crc32); + } + } + tries.incrementAndGet(); + sendWriteRequest(address, this); + } + + void beforeComplete(ProxyClient sc, ResponseHeader responseHeader) { + ownershipCache.updateOwner(stream, sc.getAddress()); + } + + void complete(SocketAddress address) { + stopwatch.stop(); + opStats.completeRequest(address, + stopwatch.elapsed(TimeUnit.MICROSECONDS), tries.get()); + } + + void fail(SocketAddress address, Throwable t) { + stopwatch.stop(); + opStats.failRequest(address, + stopwatch.elapsed(TimeUnit.MICROSECONDS), tries.get()); + } + + Long computeChecksum() { + return null; + } + + @Override + public synchronized void run(Timeout timeout) throws Exception { + if (!timeout.isCancelled() && null != nextAddressToSend) { + doSend(nextAddressToSend); + } else { + fail(null, new CancelledRequestException()); + } + } + } + + class BulkWriteOp extends StreamOp { + + final List<ByteBuffer> data; + final ArrayList<Promise<DLSN>> results; + + BulkWriteOp(final String name, final List<ByteBuffer> data) { + super(name, clientStats.getOpStats("bulk_write")); + this.data = data; + + // This could take a while (relatively speaking) for very large inputs. We probably don't want + // to go so large for other reasons though. + this.results = new ArrayList<Promise<DLSN>>(data.size()); + for (int i = 0; i < data.size(); i++) { + checkNotNull(data.get(i)); + this.results.add(new Promise<DLSN>()); + } + } + + @Override + Future<ResponseHeader> sendRequest(final ProxyClient sc) { + return sc.getService().writeBulkWithContext(stream, data, ctx) + .addEventListener(new FutureEventListener<BulkWriteResponse>() { + @Override + public void onSuccess(BulkWriteResponse response) { + // For non-success case, the ResponseHeader handler (the caller) will handle it. + // Note success in this case means no finagle errors have occurred + // (such as finagle connection issues). In general code != SUCCESS means there's some error + // reported by dlog service. The caller will handle such errors. + if (response.getHeader().getCode() == StatusCode.SUCCESS) { + beforeComplete(sc, response.getHeader()); + BulkWriteOp.this.complete(sc.getAddress(), response); + if (response.getWriteResponses().size() == 0 && data.size() > 0) { + logger.error("non-empty bulk write got back empty response without failure for stream {}", + stream); + } + } + } + @Override + public void onFailure(Throwable cause) { + // Handled by the ResponseHeader listener (attached by the caller). + } + }).map(new AbstractFunction1<BulkWriteResponse, ResponseHeader>() { + @Override + public ResponseHeader apply(BulkWriteResponse response) { + // We need to return the ResponseHeader to the caller's listener to process DLOG errors. + return response.getHeader(); + } + }); + } + + void complete(SocketAddress address, BulkWriteResponse bulkWriteResponse) { + super.complete(address); + Iterator<WriteResponse> writeResponseIterator = bulkWriteResponse.getWriteResponses().iterator(); + Iterator<Promise<DLSN>> resultIterator = results.iterator(); + + // Fill in errors from thrift responses. + while (resultIterator.hasNext() && writeResponseIterator.hasNext()) { + Promise<DLSN> result = resultIterator.next(); + WriteResponse writeResponse = writeResponseIterator.next(); + if (StatusCode.SUCCESS == writeResponse.getHeader().getCode()) { + result.setValue(DLSN.deserialize(writeResponse.getDlsn())); + } else { + result.setException(ProtocolUtils.exception(writeResponse.getHeader())); + } + } + + // Should never happen, but just in case so there's some record. + if (bulkWriteResponse.getWriteResponses().size() != data.size()) { + logger.error("wrong number of results, response = {} records = {}", + bulkWriteResponse.getWriteResponses().size(), data.size()); + } + } + + @Override + void fail(SocketAddress address, Throwable t) { + + // StreamOp.fail is called to fail the overall request. In case of BulkWriteOp we take the request level + // exception to apply to the first write. In fact for request level exceptions no request has ever been + // attempted, but logically we associate the error with the first write. + super.fail(address, t); + Iterator<Promise<DLSN>> resultIterator = results.iterator(); + + // Fail the first write with the batch level failure. + if (resultIterator.hasNext()) { + Promise<DLSN> result = resultIterator.next(); + result.setException(t); + } + + // Fail the remaining writes as cancelled requests. + while (resultIterator.hasNext()) { + Promise<DLSN> result = resultIterator.next(); + result.setException(new CancelledRequestException()); + } + } + + @SuppressWarnings("unchecked") + List<Future<DLSN>> result() { + return (List) results; + } + } + + abstract class AbstractWriteOp extends StreamOp { + + final Promise<WriteResponse> result = new Promise<WriteResponse>(); + Long crc32 = null; + + AbstractWriteOp(final String name, final OpStats opStats) { + super(name, opStats); + } + + void complete(SocketAddress address, WriteResponse response) { + super.complete(address); + result.setValue(response); + } + + @Override + void fail(SocketAddress address, Throwable t) { + super.fail(address, t); + result.setException(t); + } + + @Override + Long computeChecksum() { + if (null == crc32) { + crc32 = ProtocolUtils.streamOpCRC32(stream); + } + return crc32; + } + + @Override + Future<ResponseHeader> sendRequest(final ProxyClient sc) { + return this.sendWriteRequest(sc).addEventListener(new FutureEventListener<WriteResponse>() { + @Override + public void onSuccess(WriteResponse response) { + if (response.getHeader().getCode() == StatusCode.SUCCESS) { + beforeComplete(sc, response.getHeader()); + AbstractWriteOp.this.complete(sc.getAddress(), response); + } + } + @Override + public void onFailure(Throwable cause) { + // handled by the ResponseHeader listener + } + }).map(new AbstractFunction1<WriteResponse, ResponseHeader>() { + @Override + public ResponseHeader apply(WriteResponse response) { + return response.getHeader(); + } + }); + } + + abstract Future<WriteResponse> sendWriteRequest(ProxyClient sc); + } + + class WriteOp extends AbstractWriteOp { + final ByteBuffer data; + + WriteOp(final String name, final ByteBuffer data) { + super(name, clientStats.getOpStats("write")); + this.data = data; + } + + @Override + Future<WriteResponse> sendWriteRequest(ProxyClient sc) { + return sc.getService().writeWithContext(stream, data, ctx); + } + + @Override + Long computeChecksum() { + if (null == crc32) { + byte[] dataBytes = new byte[data.remaining()]; + data.duplicate().get(dataBytes); + crc32 = ProtocolUtils.writeOpCRC32(stream, dataBytes); + } + return crc32; + } + + Future<DLSN> result() { + return result.map(new AbstractFunction1<WriteResponse, DLSN>() { + @Override + public DLSN apply(WriteResponse response) { + return DLSN.deserialize(response.getDlsn()); + } + }); + } + } + + class TruncateOp extends AbstractWriteOp { + final DLSN dlsn; + + TruncateOp(String name, DLSN dlsn) { + super(name, clientStats.getOpStats("truncate")); + this.dlsn = dlsn; + } + + @Override + Long computeChecksum() { + if (null == crc32) { + crc32 = ProtocolUtils.truncateOpCRC32(stream, dlsn); + } + return crc32; + } + + @Override + Future<WriteResponse> sendWriteRequest(ProxyClient sc) { + return sc.getService().truncate(stream, dlsn.serialize(), ctx); + } + + Future<Boolean> result() { + return result.map(new AbstractFunction1<WriteResponse, Boolean>() { + @Override + public Boolean apply(WriteResponse response) { + return true; + } + }); + } + } + + class WriteRecordSetOp extends WriteOp { + + WriteRecordSetOp(String name, LogRecordSetBuffer recordSet) { + super(name, recordSet.getBuffer()); + ctx.setIsRecordSet(true); + } + + } + + + class ReleaseOp extends AbstractWriteOp { + + ReleaseOp(String name) { + super(name, clientStats.getOpStats("release")); + } + + @Override + Future<WriteResponse> sendWriteRequest(ProxyClient sc) { + return sc.getService().release(stream, ctx); + } + + @Override + void beforeComplete(ProxyClient sc, ResponseHeader header) { + ownershipCache.removeOwnerFromStream(stream, sc.getAddress(), "Stream Deleted"); + } + + Future<Void> result() { + return result.map(new AbstractFunction1<WriteResponse, Void>() { + @Override + public Void apply(WriteResponse response) { + return null; + } + }); + } + } + + class DeleteOp extends AbstractWriteOp { + + DeleteOp(String name) { + super(name, clientStats.getOpStats("delete")); + } + + @Override + Future<WriteResponse> sendWriteRequest(ProxyClient sc) { + return sc.getService().delete(stream, ctx); + } + + @Override + void beforeComplete(ProxyClient sc, ResponseHeader header) { + ownershipCache.removeOwnerFromStream(stream, sc.getAddress(), "Stream Deleted"); + } + + Future<Void> result() { + return result.map(new AbstractFunction1<WriteResponse, Void>() { + @Override + public Void apply(WriteResponse v1) { + return null; + } + }); + } + } + + class CreateOp extends AbstractWriteOp { + + CreateOp(String name) { + super(name, clientStats.getOpStats("create")); + } + + @Override + Future<WriteResponse> sendWriteRequest(ProxyClient sc) { + return sc.getService().create(stream, ctx); + } + + @Override + void beforeComplete(ProxyClient sc, ResponseHeader header) { + ownershipCache.updateOwner(stream, sc.getAddress()); + } + + Future<Void> result() { + return result.map(new AbstractFunction1<WriteResponse, Void>() { + @Override + public Void apply(WriteResponse v1) { + return null; + } + }).voided(); + } + } + + class HeartbeatOp extends AbstractWriteOp { + HeartbeatOptions options; + + HeartbeatOp(String name, boolean sendReaderHeartBeat) { + super(name, clientStats.getOpStats("heartbeat")); + options = new HeartbeatOptions(); + options.setSendHeartBeatToReader(sendReaderHeartBeat); + } + + @Override + Future<WriteResponse> sendWriteRequest(ProxyClient sc) { + return sc.getService().heartbeatWithOptions(stream, ctx, options); + } + + Future<Void> result() { + return result.map(new AbstractFunction1<WriteResponse, Void>() { + @Override + public Void apply(WriteResponse response) { + return null; + } + }); + } + } + + // Stats + private final ClientStats clientStats; + + public DistributedLogClientImpl(String name, + ClientId clientId, + RoutingService routingService, + ClientBuilder clientBuilder, + ClientConfig clientConfig, + Optional<ClusterClient> clusterClient, + StatsReceiver statsReceiver, + StatsReceiver streamStatsReceiver, + RegionResolver regionResolver, + boolean enableRegionStats) { + this.clientName = name; + this.clientId = clientId; + this.routingService = routingService; + this.clientConfig = clientConfig; + this.streamFailfast = clientConfig.getStreamFailfast(); + this.streamNameRegexPattern = Pattern.compile(clientConfig.getStreamNameRegex()); + this.regionResolver = regionResolver; + // Build the timer + this.dlTimer = new HashedWheelTimer( + new ThreadFactoryBuilder().setNameFormat("DLClient-" + name + "-timer-%d").build(), + this.clientConfig.getRedirectBackoffStartMs(), + TimeUnit.MILLISECONDS); + // register routing listener + this.routingService.registerListener(this); + // build the ownership cache + this.ownershipCache = new OwnershipCache(this.clientConfig, this.dlTimer, statsReceiver, streamStatsReceiver); + // Client Stats + this.clientStats = new ClientStats(statsReceiver, enableRegionStats, regionResolver); + // Client Manager + this.clientBuilder = ProxyClient.newBuilder(clientName, clientId, clientBuilder, clientConfig, clientStats); + this.clientManager = new ProxyClientManager( + this.clientConfig, // client config + this.clientBuilder, // client builder + this.dlTimer, // timer + this, // host provider + clientStats); // client stats + this.clusterClient = clusterClient; + this.clientManager.registerProxyListener(this); + + // Cache Stats + StatsReceiver cacheStatReceiver = statsReceiver.scope("cache"); + Seq<String> numCachedStreamsGaugeName = + scala.collection.JavaConversions.asScalaBuffer(Arrays.asList("num_streams")).toList(); + cacheStatReceiver.provideGauge(numCachedStreamsGaugeName, new Function0<Object>() { + @Override + public Object apply() { + return (float) ownershipCache.getNumCachedStreams(); + } + }); + Seq<String> numCachedHostsGaugeName = + scala.collection.JavaConversions.asScalaBuffer(Arrays.asList("num_hosts")).toList(); + cacheStatReceiver.provideGauge(numCachedHostsGaugeName, new Function0<Object>() { + @Override + public Object apply() { + return (float) clientManager.getNumProxies(); + } + }); + + logger.info("Build distributedlog client : name = {}, client_id = {}, routing_service = {}," + + " stats_receiver = {}, thriftmux = {}", + new Object[] { + name, + clientId, + routingService.getClass(), + statsReceiver.getClass(), + clientConfig.getThriftMux() + }); + } + + @Override + public Set<SocketAddress> getHosts() { + Set<SocketAddress> hosts = Sets.newHashSet(); + // if using server side routing, we only handshake with the hosts in ownership cache. + if (!clusterClient.isPresent()) { + hosts.addAll(this.routingService.getHosts()); + } + hosts.addAll(this.ownershipCache.getStreamOwnershipDistribution().keySet()); + return hosts; + } + + @Override + public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) { + if (null != serverInfo + && serverInfo.isSetServerStatus() + && ServerStatus.DOWN == serverInfo.getServerStatus()) { + logger.info("{} is detected as DOWN during handshaking", address); + // server is shutting down + handleServiceUnavailable(address, client, Optional.<StreamOp>absent()); + return; + } + + if (null != serverInfo && serverInfo.isSetOwnerships()) { + Map<String, String> ownerships = serverInfo.getOwnerships(); + logger.debug("Handshaked with {} : {} ownerships returned.", address, ownerships.size()); + for (Map.Entry<String, String> entry : ownerships.entrySet()) { + Matcher matcher = streamNameRegexPattern.matcher(entry.getKey()); + if (!matcher.matches()) { + continue; + } + updateOwnership(entry.getKey(), entry.getValue()); + } + } else { + logger.debug("Handshaked with {} : no ownerships returned", address); + } + } + + @Override + public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) { + cause = showRootCause(Optional.<StreamOp>absent(), cause); + handleRequestException(address, client, Optional.<StreamOp>absent(), cause); + } + + @VisibleForTesting + public void handshake() { + clientManager.handshake(); + logger.info("Handshaked with {} hosts, cached {} streams", + clientManager.getNumProxies(), ownershipCache.getNumCachedStreams()); + } + + @Override + public void onServerLeft(SocketAddress address) { + onServerLeft(address, null); + } + + private void onServerLeft(SocketAddress address, ProxyClient sc) { + ownershipCache.removeAllStreamsFromOwner(address); + if (null == sc) { + clientManager.removeClient(address); + } else { + clientManager.removeClient(address, sc); + } + } + + @Override + public void onServerJoin(SocketAddress address) { + // we only pre-create connection for client-side routing + // if it is server side routing, we only know the exact proxy address + // when #getOwner. + if (!clusterClient.isPresent()) { + clientManager.createClient(address); + } + } + + public void close() { + closeLock.writeLock().lock(); + try { + if (closed) { + return; + } + closed = true; + } finally { + closeLock.writeLock().unlock(); + } + clientManager.close(); + routingService.unregisterListener(this); + routingService.stopService(); + dlTimer.stop(); + } + + @Override + public Future<Void> check(String stream) { + final HeartbeatOp op = new HeartbeatOp(stream, false); + sendRequest(op); + return op.result(); + } + + @Override + public Future<Void> heartbeat(String stream) { + final HeartbeatOp op = new HeartbeatOp(stream, true); + sendRequest(op); + return op.result(); + } + + @Override + public Map<SocketAddress, Set<String>> getStreamOwnershipDistribution() { + return ownershipCache.getStreamOwnershipDistribution(); + } + + @Override + public Future<Void> setAcceptNewStream(boolean enabled) { + Map<SocketAddress, ProxyClient> snapshot = clientManager.getAllClients(); + List<Future<Void>> futures = new ArrayList<Future<Void>>(snapshot.size()); + for (Map.Entry<SocketAddress, ProxyClient> entry : snapshot.entrySet()) { + futures.add(entry.getValue().getService().setAcceptNewStream(enabled)); + } + return Future.collect(futures).map(new Function<List<Void>, Void>() { + @Override + public Void apply(List<Void> list) { + return null; + } + }); + } + + @Override + public Future<DLSN> write(String stream, ByteBuffer data) { + final WriteOp op = new WriteOp(stream, data); + sendRequest(op); + return op.result(); + } + + @Override + public Future<DLSN> writeRecordSet(String stream, final LogRecordSetBuffer recordSet) { + final WriteRecordSetOp op = new WriteRecordSetOp(stream, recordSet); + sendRequest(op); + return op.result(); + } + + @Override + public List<Future<DLSN>> writeBulk(String stream, List<ByteBuffer> data) { + if (data.size() > 0) { + final BulkWriteOp op = new BulkWriteOp(stream, data); + sendRequest(op); + return op.result(); + } else { + return Collections.emptyList(); + } + } + + @Override + public Future<Boolean> truncate(String stream, DLSN dlsn) { + final TruncateOp op = new TruncateOp(stream, dlsn); + sendRequest(op); + return op.result(); + } + + @Override + public Future<Void> delete(String stream) { + final DeleteOp op = new DeleteOp(stream); + sendRequest(op); + return op.result(); + } + + @Override + public Future<Void> release(String stream) { + final ReleaseOp op = new ReleaseOp(stream); + sendRequest(op); + return op.result(); + } + + @Override + public Future<Void> create(String stream) { + final CreateOp op = new CreateOp(stream); + sendRequest(op); + return op.result(); + } + + private void sendRequest(final StreamOp op) { + closeLock.readLock().lock(); + try { + if (closed) { + op.fail(null, new DLClientClosedException("Client " + clientName + " is closed.")); + } else { + doSend(op, null); + } + } finally { + closeLock.readLock().unlock(); + } + } + + /** + * Send the stream operation by routing service, excluding previous address if it is not null. + * + * @param op + * stream operation. + * @param previousAddr + * previous tried address. + */ + private void doSend(final StreamOp op, final SocketAddress previousAddr) { + if (null != previousAddr) { + op.routingContext.addTriedHost(previousAddr, StatusCode.WRITE_EXCEPTION); + } + // Get host first + final SocketAddress address = ownershipCache.getOwner(op.stream); + if (null == address || op.routingContext.isTriedHost(address)) { + getOwner(op).addEventListener(new FutureEventListener<SocketAddress>() { + @Override + public void onFailure(Throwable cause) { + op.fail(null, cause); + } + + @Override + public void onSuccess(SocketAddress ownerAddr) { + op.send(ownerAddr); + } + }); + } else { + op.send(address); + } + } + + private void retryGetOwnerFromResourcePlacementServer(final StreamOp op, + final Promise<SocketAddress> getOwnerPromise, + final Throwable cause) { + if (op.shouldTimeout()) { + op.fail(null, cause); + return; + } + getOwnerFromResourcePlacementServer(op, getOwnerPromise); + } + + private void getOwnerFromResourcePlacementServer(final StreamOp op, + final Promise<SocketAddress> getOwnerPromise) { + clusterClient.get().getService().getOwner(op.stream, op.ctx) + .addEventListener(new FutureEventListener<WriteResponse>() { + @Override + public void onFailure(Throwable cause) { + getOwnerPromise.updateIfEmpty(new Throw<SocketAddress>(cause)); + } + + @Override + public void onSuccess(WriteResponse value) { + if (StatusCode.FOUND == value.getHeader().getCode() + && null != value.getHeader().getLocation()) { + try { + InetSocketAddress addr = DLSocketAddress.deserialize( + value.getHeader().getLocation() + ).getSocketAddress(); + getOwnerPromise.updateIfEmpty(new Return<SocketAddress>(addr)); + } catch (IOException e) { + // retry from the routing server again + logger.error("ERROR in getOwner", e); + retryGetOwnerFromResourcePlacementServer(op, getOwnerPromise, e); + return; + } + } else { + // retry from the routing server again + retryGetOwnerFromResourcePlacementServer(op, getOwnerPromise, + new StreamUnavailableException("Stream " + op.stream + "'s owner is unknown")); + } + } + }); + } + + private Future<SocketAddress> getOwner(final StreamOp op) { + if (clusterClient.isPresent()) { + final Promise<SocketAddress> getOwnerPromise = new Promise<SocketAddress>(); + getOwnerFromResourcePlacementServer(op, getOwnerPromise); + return getOwnerPromise; + } + // pickup host by hashing + try { + return Future.value(routingService.getHost(op.stream, op.routingContext)); + } catch (NoBrokersAvailableException nbae) { + return Future.exception(nbae); + } + } + + private void sendWriteRequest(final SocketAddress addr, final StreamOp op) { + // Get corresponding finagle client + final ProxyClient sc = clientManager.getClient(addr); + final long startTimeNanos = System.nanoTime(); + // write the request to that host. + op.sendRequest(sc).addEventListener(new FutureEventListener<ResponseHeader>() { + @Override + public void onSuccess(ResponseHeader header) { + if (logger.isDebugEnabled()) { + logger.debug("Received response; header: {}", header); + } + clientStats.completeProxyRequest(addr, header.getCode(), startTimeNanos); + // update routing context + op.routingContext.addTriedHost(addr, header.getCode()); + switch (header.getCode()) { + case SUCCESS: + // success handling is done per stream op + break; + case FOUND: + handleRedirectResponse(header, op, addr); + break; + // for overcapacity, dont report failure since this normally happens quite a bit + case OVER_CAPACITY: + logger.debug("Failed to write request to {} : {}", op.stream, header); + op.fail(addr, ProtocolUtils.exception(header)); + break; + // for responses that indicate the requests definitely failed, + // we should fail them immediately (e.g. TOO_LARGE_RECORD, METADATA_EXCEPTION) + case NOT_IMPLEMENTED: + case METADATA_EXCEPTION: + case LOG_EMPTY: + case LOG_NOT_FOUND: + case TRUNCATED_TRANSACTION: + case END_OF_STREAM: + case TRANSACTION_OUT_OF_ORDER: + case INVALID_STREAM_NAME: + case REQUEST_DENIED: + case TOO_LARGE_RECORD: + case CHECKSUM_FAILED: + // status code NOT_READY is returned if failfast is enabled in the server. don't redirect + // since the proxy may still own the stream. + case STREAM_NOT_READY: + op.fail(addr, ProtocolUtils.exception(header)); + break; + case SERVICE_UNAVAILABLE: + handleServiceUnavailable(addr, sc, Optional.of(op)); + break; + case REGION_UNAVAILABLE: + // region is unavailable, redirect the request to hosts in other region + redirect(op, null); + break; + // Proxy was overloaded and refused to try to acquire the stream. Don't remove ownership, since + // we didn't have it in the first place. + case TOO_MANY_STREAMS: + handleRedirectableError(addr, op, header); + break; + case STREAM_UNAVAILABLE: + case ZOOKEEPER_ERROR: + case LOCKING_EXCEPTION: + case UNEXPECTED: + case INTERRUPTED: + case BK_TRANSMIT_ERROR: + case FLUSH_TIMEOUT: + default: + // when we are receiving these exceptions from proxy, it means proxy or the stream is closed + // redirect the request. + ownershipCache.removeOwnerFromStream(op.stream, addr, header.getCode().name()); + handleRedirectableError(addr, op, header); + break; + } + } + + @Override + public void onFailure(Throwable cause) { + Optional<StreamOp> opOptional = Optional.of(op); + cause = showRootCause(opOptional, cause); + clientStats.failProxyRequest(addr, cause, startTimeNanos); + handleRequestException(addr, sc, opOptional, cause); + } + }); + } + + // Response Handlers + + Throwable showRootCause(Optional<StreamOp> op, Throwable cause) { + if (cause instanceof Failure) { + Failure failure = (Failure) cause; + if (failure.isFlagged(Failure.Wrapped())) { + try { + // if it is a wrapped failure, unwrap it first + cause = failure.show(); + } catch (IllegalArgumentException iae) { + if (op.isPresent()) { + logger.warn("Failed to unwrap finagle failure of stream {} : ", op.get().stream, iae); + } else { + logger.warn("Failed to unwrap finagle failure : ", iae); + } + } + } + } + return cause; + } + + private void handleRedirectableError(SocketAddress addr, + StreamOp op, + ResponseHeader header) { + if (streamFailfast) { + op.fail(addr, ProtocolUtils.exception(header)); + } else { + redirect(op, null); + } + } + + void handleServiceUnavailable(SocketAddress addr, + ProxyClient sc, + Optional<StreamOp> op) { + // service is unavailable, remove it out of routing service + routingService.removeHost(addr, new ServiceUnavailableException(addr + " is unavailable now.")); + onServerLeft(addr); + if (op.isPresent()) { + ownershipCache.removeOwnerFromStream(op.get().stream, addr, addr + " is unavailable now."); + // redirect the request to other host. + redirect(op.get(), null); + } + } + + void handleRequestException(SocketAddress addr, + ProxyClient sc, + Optional<StreamOp> op, + Throwable cause) { + boolean resendOp = false; + boolean removeOwnerFromStream = false; + SocketAddress previousAddr = addr; + String reason = cause.getMessage(); + if (cause instanceof ConnectionFailedException || cause instanceof java.net.ConnectException) { + routingService.removeHost(addr, cause); + onServerLeft(addr, sc); + removeOwnerFromStream = true; + // redirect the request to other host. + resendOp = true; + } else if (cause instanceof ChannelException) { + // java.net.ConnectException typically means connection is refused remotely + // no process listening on remote address/port. + if (cause.getCause() instanceof java.net.ConnectException) { + routingService.removeHost(addr, cause.getCause()); + onServerLeft(addr); + reason = cause.getCause().getMessage(); + } else { + routingService.removeHost(addr, cause); + reason = cause.getMessage(); + } + removeOwnerFromStream = true; + // redirect the request to other host. + resendOp = true; + } else if (cause instanceof ServiceTimeoutException) { + // redirect the request to itself again, which will backoff for a while + resendOp = true; + previousAddr = null; + } else if (cause instanceof WriteException) { + // redirect the request to other host. + resendOp = true; + } else if (cause instanceof ServiceException) { + // redirect the request to other host. + clientManager.removeClient(addr, sc); + resendOp = true; + } else if (cause instanceof TApplicationException) { + handleTApplicationException(cause, op, addr, sc); + } else if (cause instanceof Failure) { + handleFinagleFailure((Failure) cause, op, addr); + } else { + // Default handler + handleException(cause, op, addr); + } + + if (op.isPresent()) { + if (removeOwnerFromStream) { + ownershipCache.removeOwnerFromStream(op.get().stream, addr, reason); + } + if (resendOp) { + doSend(op.get(), previousAddr); + } + } + } + + /** + * Redirect the request to new proxy <i>newAddr</i>. If <i>newAddr</i> is null, + * it would pick up a host from routing service. + * + * @param op + * stream operation + * @param newAddr + * new proxy address + */ + void redirect(StreamOp op, SocketAddress newAddr) { + ownershipCache.getOwnershipStatsLogger().onRedirect(op.stream); + if (null != newAddr) { + logger.debug("Redirect request {} to new owner {}.", op, newAddr); + op.send(newAddr); + } else { + doSend(op, null); + } + } + + void handleFinagleFailure(Failure failure, + Optional<StreamOp> op, + SocketAddress addr) { + if (failure.isFlagged(Failure.Restartable())) { + if (op.isPresent()) { + // redirect the request to other host + doSend(op.get(), addr); + } + } else { + // fail the request if it is other types of failures + handleException(failure, op, addr); + } + } + + void handleException(Throwable cause, + Optional<StreamOp> op, + SocketAddress addr) { + // RequestTimeoutException: fail it and let client decide whether to retry or not. + + // FailedFastException: + // We don't actually know when FailedFastException will be thrown + // so properly we just throw it back to application to let application + // handle it. + + // Other Exceptions: as we don't know how to handle them properly so throw them to client + if (op.isPresent()) { + logger.error("Failed to write request to {} @ {} : {}", + new Object[]{op.get().stream, addr, cause.toString()}); + op.get().fail(addr, cause); + } + } + + void handleTApplicationException(Throwable cause, + Optional<StreamOp> op, + SocketAddress addr, + ProxyClient sc) { + TApplicationException ex = (TApplicationException) cause; + if (ex.getType() == TApplicationException.UNKNOWN_METHOD) { + // if we encountered unknown method exception on thrift server, it means this proxy + // has problem. we should remove it from routing service, clean up ownerships + routingService.removeHost(addr, cause); + onServerLeft(addr, sc); + if (op.isPresent()) { + ownershipCache.removeOwnerFromStream(op.get().stream, addr, cause.getMessage()); + doSend(op.get(), addr); + } + } else { + handleException(cause, op, addr); + } + } + + void handleRedirectResponse(ResponseHeader header, StreamOp op, SocketAddress curAddr) { + SocketAddress ownerAddr = null; + if (header.isSetLocation()) { + String owner = header.getLocation(); + try { + ownerAddr = DLSocketAddress.deserialize(owner).getSocketAddress(); + // if we are receiving a direct request to same host, we won't try the same host. + // as the proxy will shut itself down if it redirects client to itself. + if (curAddr.equals(ownerAddr)) { + logger.warn("Request to stream {} is redirected to same server {}!", op.stream, curAddr); + ownerAddr = null; + } else { + // update ownership when redirects. + ownershipCache.updateOwner(op.stream, ownerAddr); + } + } catch (IOException e) { + ownerAddr = null; + } + } + redirect(op, ownerAddr); + } + + void updateOwnership(String stream, String location) { + try { + SocketAddress ownerAddr = DLSocketAddress.deserialize(location).getSocketAddress(); + // update ownership + ownershipCache.updateOwner(stream, ownerAddr); + } catch (IOException e) { + logger.warn("Invalid ownership {} found for stream {} : ", + new Object[] { location, stream, e }); + } + } + +}