http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java new file mode 100644 index 0000000..b3f3368 --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java @@ -0,0 +1,486 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE; +import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE; + +import com.google.common.base.Stopwatch; +import com.google.common.base.Ticker; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.LogRecordSet; +import org.apache.distributedlog.LogRecordSetBuffer; +import org.apache.distributedlog.client.speculative.DefaultSpeculativeRequestExecutionPolicy; +import org.apache.distributedlog.client.speculative.SpeculativeRequestExecutionPolicy; +import org.apache.distributedlog.client.speculative.SpeculativeRequestExecutor; +import org.apache.distributedlog.exceptions.LogRecordTooLongException; +import org.apache.distributedlog.exceptions.WriteException; +import org.apache.distributedlog.io.CompressionCodec; +import org.apache.distributedlog.service.DistributedLogClient; +import com.twitter.finagle.IndividualRequestTimeoutException; +import com.twitter.util.Duration; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Promise; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Write to multiple streams. + */ +public class DistributedLogMultiStreamWriter implements Runnable { + + /** + * Create a new builder to create a multi stream writer. + * + * @return a new builder to create a multi stream writer. + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for the multi stream writer. + */ + public static class Builder { + + private DistributedLogClient client = null; + private List<String> streams = null; + private int bufferSize = 16 * 1024; // 16k + private long flushIntervalMicros = 2000; // 2ms + private CompressionCodec.Type codec = CompressionCodec.Type.NONE; + private ScheduledExecutorService executorService = null; + private long requestTimeoutMs = 500; // 500ms + private int firstSpeculativeTimeoutMs = 50; // 50ms + private int maxSpeculativeTimeoutMs = 200; // 200ms + private float speculativeBackoffMultiplier = 2; + private Ticker ticker = Ticker.systemTicker(); + + private Builder() {} + + /** + * Set the distributedlog client used for multi stream writer. + * + * @param client + * distributedlog client + * @return builder + */ + public Builder client(DistributedLogClient client) { + this.client = client; + return this; + } + + /** + * Set the list of streams to write to. + * + * @param streams + * list of streams to write + * @return builder + */ + public Builder streams(List<String> streams) { + this.streams = streams; + return this; + } + + /** + * Set the output buffer size. + * + * <p>If output buffer size is 0, the writes will be transmitted to + * wire immediately. + * + * @param bufferSize + * output buffer size + * @return builder + */ + public Builder bufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + /** + * Set the flush interval in milliseconds. + * + * @param flushIntervalMs + * flush interval in milliseconds. + * @return builder + */ + public Builder flushIntervalMs(int flushIntervalMs) { + this.flushIntervalMicros = TimeUnit.MILLISECONDS.toMicros(flushIntervalMs); + return this; + } + + /** + * Set the flush interval in microseconds. + * + * @param flushIntervalMicros + * flush interval in microseconds. + * @return builder + */ + public Builder flushIntervalMicros(int flushIntervalMicros) { + this.flushIntervalMicros = flushIntervalMicros; + return this; + } + + /** + * Set compression codec. + * + * @param codec compression codec. + * @return builder + */ + public Builder compressionCodec(CompressionCodec.Type codec) { + this.codec = codec; + return this; + } + + /** + * Set the scheduler to flush output buffers. + * + * @param executorService + * executor service to flush output buffers. + * @return builder + */ + public Builder scheduler(ScheduledExecutorService executorService) { + this.executorService = executorService; + return this; + } + + /** + * Set request timeout in milliseconds. + * + * @param requestTimeoutMs + * request timeout in milliseconds. + * @return builder + */ + public Builder requestTimeoutMs(long requestTimeoutMs) { + this.requestTimeoutMs = requestTimeoutMs; + return this; + } + + /** + * Set the first speculative timeout in milliseconds. + * + * <p>The multi-streams writer does speculative writes on streams. + * The write issues first write request to a stream, if the write request + * doesn't respond within speculative timeout. it issues next write request + * to a different stream. It does such speculative retries until receive + * a success or request timeout ({@link #requestTimeoutMs(long)}). + * + * <p>This setting is to configure the first speculative timeout, in milliseconds. + * + * @param timeoutMs + * timeout in milliseconds + * @return builder + */ + public Builder firstSpeculativeTimeoutMs(int timeoutMs) { + this.firstSpeculativeTimeoutMs = timeoutMs; + return this; + } + + /** + * Set the max speculative timeout in milliseconds. + * + * <p>The multi-streams writer does speculative writes on streams. + * The write issues first write request to a stream, if the write request + * doesn't respond within speculative timeout. it issues next write request + * to a different stream. It does such speculative retries until receive + * a success or request timeout ({@link #requestTimeoutMs(long)}). + * + * <p>This setting is to configure the max speculative timeout, in milliseconds. + * + * @param timeoutMs + * timeout in milliseconds + * @return builder + */ + public Builder maxSpeculativeTimeoutMs(int timeoutMs) { + this.maxSpeculativeTimeoutMs = timeoutMs; + return this; + } + + /** + * Set the speculative timeout backoff multiplier. + * + * <p>The multi-streams writer does speculative writes on streams. + * The write issues first write request to a stream, if the write request + * doesn't respond within speculative timeout. it issues next write request + * to a different stream. It does such speculative retries until receive + * a success or request timeout ({@link #requestTimeoutMs(long)}). + * + * <p>This setting is to configure the speculative timeout backoff multiplier. + * + * @param multiplier + * backoff multiplier + * @return builder + */ + public Builder speculativeBackoffMultiplier(float multiplier) { + this.speculativeBackoffMultiplier = multiplier; + return this; + } + + /** + * Ticker for timing. + * + * @param ticker + * ticker + * @return builder + * @see Ticker + */ + public Builder clockTicker(Ticker ticker) { + this.ticker = ticker; + return this; + } + + /** + * Build the multi stream writer. + * + * @return the multi stream writer. + */ + public DistributedLogMultiStreamWriter build() { + checkArgument((null != streams && !streams.isEmpty()), + "No streams provided"); + checkNotNull(client, + "No distributedlog client provided"); + checkNotNull(codec, + "No compression codec provided"); + checkArgument(firstSpeculativeTimeoutMs > 0 + && firstSpeculativeTimeoutMs <= maxSpeculativeTimeoutMs + && speculativeBackoffMultiplier > 0 + && maxSpeculativeTimeoutMs < requestTimeoutMs, + "Invalid speculative timeout settings"); + return new DistributedLogMultiStreamWriter( + streams, + client, + Math.min(bufferSize, MAX_LOGRECORDSET_SIZE), + flushIntervalMicros, + requestTimeoutMs, + firstSpeculativeTimeoutMs, + maxSpeculativeTimeoutMs, + speculativeBackoffMultiplier, + codec, + ticker, + executorService); + } + } + + /** + * Pending Write Request. + */ + class PendingWriteRequest implements FutureEventListener<DLSN>, + SpeculativeRequestExecutor { + + private final LogRecordSetBuffer recordSet; + private AtomicBoolean complete = new AtomicBoolean(false); + private final Stopwatch stopwatch = Stopwatch.createStarted(clockTicker); + private int nextStream; + private int numTriedStreams = 0; + + PendingWriteRequest(LogRecordSetBuffer recordSet) { + this.recordSet = recordSet; + this.nextStream = Math.abs(nextStreamId.incrementAndGet()) % numStreams; + } + + synchronized String sendNextWrite() { + long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS); + if (elapsedMs > requestTimeoutMs || numTriedStreams >= numStreams) { + fail(new IndividualRequestTimeoutException(Duration.fromMilliseconds(elapsedMs))); + return null; + } + try { + return sendWriteToStream(nextStream); + } finally { + nextStream = (nextStream + 1) % numStreams; + ++numTriedStreams; + } + } + + synchronized String sendWriteToStream(int streamId) { + String stream = getStream(streamId); + client.writeRecordSet(stream, recordSet) + .addEventListener(this); + return stream; + } + + @Override + public void onSuccess(DLSN dlsn) { + if (!complete.compareAndSet(false, true)) { + return; + } + recordSet.completeTransmit( + dlsn.getLogSegmentSequenceNo(), + dlsn.getEntryId(), + dlsn.getSlotId()); + } + + @Override + public void onFailure(Throwable cause) { + sendNextWrite(); + } + + private void fail(Throwable cause) { + if (!complete.compareAndSet(false, true)) { + return; + } + recordSet.abortTransmit(cause); + } + + @Override + public Future<Boolean> issueSpeculativeRequest() { + return Future.value(!complete.get() && null != sendNextWrite()); + } + } + + private final int numStreams; + private final List<String> streams; + private final DistributedLogClient client; + private final int bufferSize; + private final long requestTimeoutMs; + private final SpeculativeRequestExecutionPolicy speculativePolicy; + private final Ticker clockTicker; + private final CompressionCodec.Type codec; + private final ScheduledExecutorService scheduler; + private final boolean ownScheduler; + private final AtomicInteger nextStreamId; + private LogRecordSet.Writer recordSetWriter; + + private DistributedLogMultiStreamWriter(List<String> streams, + DistributedLogClient client, + int bufferSize, + long flushIntervalMicros, + long requestTimeoutMs, + int firstSpecultiveTimeoutMs, + int maxSpeculativeTimeoutMs, + float speculativeBackoffMultiplier, + CompressionCodec.Type codec, + Ticker clockTicker, + ScheduledExecutorService scheduler) { + this.streams = Lists.newArrayList(streams); + this.numStreams = this.streams.size(); + this.client = client; + this.bufferSize = bufferSize; + this.requestTimeoutMs = requestTimeoutMs; + this.codec = codec; + this.clockTicker = clockTicker; + if (null == scheduler) { + this.scheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("MultiStreamWriterFlushThread-%d") + .build()); + this.ownScheduler = true; + } else { + this.scheduler = scheduler; + this.ownScheduler = false; + } + this.speculativePolicy = new DefaultSpeculativeRequestExecutionPolicy( + firstSpecultiveTimeoutMs, + maxSpeculativeTimeoutMs, + speculativeBackoffMultiplier); + // shuffle the streams + Collections.shuffle(this.streams); + this.nextStreamId = new AtomicInteger(0); + this.recordSetWriter = newRecordSetWriter(); + + if (flushIntervalMicros > 0) { + this.scheduler.scheduleAtFixedRate( + this, + flushIntervalMicros, + flushIntervalMicros, + TimeUnit.MICROSECONDS); + } + } + + String getStream(int streamId) { + return streams.get(streamId); + } + + synchronized LogRecordSet.Writer getLogRecordSetWriter() { + return recordSetWriter; + } + + private LogRecordSet.Writer newRecordSetWriter() { + return LogRecordSet.newWriter( + bufferSize, + codec); + } + + public synchronized Future<DLSN> write(ByteBuffer buffer) { + int logRecordSize = buffer.remaining(); + if (logRecordSize > MAX_LOGRECORD_SIZE) { + return Future.exception(new LogRecordTooLongException( + "Log record of size " + logRecordSize + " written when only " + + MAX_LOGRECORD_SIZE + " is allowed")); + } + // if exceed max number of bytes + if ((recordSetWriter.getNumBytes() + logRecordSize) > MAX_LOGRECORDSET_SIZE) { + flush(); + } + Promise<DLSN> writePromise = new Promise<DLSN>(); + try { + recordSetWriter.writeRecord(buffer, writePromise); + } catch (LogRecordTooLongException e) { + return Future.exception(e); + } catch (WriteException e) { + recordSetWriter.abortTransmit(e); + recordSetWriter = newRecordSetWriter(); + return Future.exception(e); + } + if (recordSetWriter.getNumBytes() >= bufferSize) { + flush(); + } + return writePromise; + } + + @Override + public void run() { + flush(); + } + + private void flush() { + LogRecordSet.Writer recordSetToFlush; + synchronized (this) { + if (recordSetWriter.getNumRecords() == 0) { + return; + } + recordSetToFlush = recordSetWriter; + recordSetWriter = newRecordSetWriter(); + } + transmit(recordSetToFlush); + } + + private void transmit(LogRecordSet.Writer recordSetToFlush) { + PendingWriteRequest writeRequest = + new PendingWriteRequest(recordSetToFlush); + this.speculativePolicy.initiateSpeculativeRequest(scheduler, writeRequest); + } + + public void close() { + if (ownScheduler) { + this.scheduler.shutdown(); + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/monitor/MonitorServiceClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/monitor/MonitorServiceClient.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/monitor/MonitorServiceClient.java new file mode 100644 index 0000000..ed6269b --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/monitor/MonitorServiceClient.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.monitor; + +import com.twitter.util.Future; +import java.net.SocketAddress; +import java.util.Map; +import java.util.Set; + +/** + * Interface for distributedlog monitor service. + */ +public interface MonitorServiceClient { + + /** + * Check a given stream. + * + * @param stream + * stream. + * @return check result. + */ + Future<Void> check(String stream); + + /** + * Send heartbeat to the stream and its readers. + * + * @param stream + * stream. + * @return check result. + */ + Future<Void> heartbeat(String stream); + + /** + * Get current ownership distribution from current monitor service view. + * + * @return current ownership distribution + */ + Map<SocketAddress, Set<String>> getStreamOwnershipDistribution(); + + /** + * Enable/Disable accepting new stream on a given proxy. + * + * @param enabled + * flag to enable/disable accepting new streams on a given proxy + * @return void + */ + Future<Void> setAcceptNewStream(boolean enabled); + + /** + * Close the client. + */ + void close(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/monitor/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/monitor/package-info.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/monitor/package-info.java new file mode 100644 index 0000000..d7e2c94 --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/monitor/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * DistributedLog Monitor Client. + */ +package org.apache.distributedlog.client.monitor; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/ownership/OwnershipCache.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/ownership/OwnershipCache.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/ownership/OwnershipCache.java new file mode 100644 index 0000000..f3c24ca --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/ownership/OwnershipCache.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.ownership; + +import com.google.common.collect.ImmutableMap; +import org.apache.distributedlog.client.ClientConfig; +import org.apache.distributedlog.client.stats.OwnershipStatsLogger; +import com.twitter.finagle.stats.StatsReceiver; +import java.net.SocketAddress; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.Timeout; +import org.jboss.netty.util.TimerTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Client Side Ownership Cache. + */ +public class OwnershipCache implements TimerTask { + + private static final Logger logger = LoggerFactory.getLogger(OwnershipCache.class); + + private final ConcurrentHashMap<String, SocketAddress> stream2Addresses = + new ConcurrentHashMap<String, SocketAddress>(); + private final ConcurrentHashMap<SocketAddress, Set<String>> address2Streams = + new ConcurrentHashMap<SocketAddress, Set<String>>(); + private final ClientConfig clientConfig; + private final HashedWheelTimer timer; + + // Stats + private final OwnershipStatsLogger ownershipStatsLogger; + + public OwnershipCache(ClientConfig clientConfig, + HashedWheelTimer timer, + StatsReceiver statsReceiver, + StatsReceiver streamStatsReceiver) { + this.clientConfig = clientConfig; + this.timer = timer; + this.ownershipStatsLogger = new OwnershipStatsLogger(statsReceiver, streamStatsReceiver); + scheduleDumpOwnershipCache(); + } + + private void scheduleDumpOwnershipCache() { + if (clientConfig.isPeriodicDumpOwnershipCacheEnabled() + && clientConfig.getPeriodicDumpOwnershipCacheIntervalMs() > 0) { + timer.newTimeout(this, clientConfig.getPeriodicDumpOwnershipCacheIntervalMs(), + TimeUnit.MILLISECONDS); + } + } + + @Override + public void run(Timeout timeout) throws Exception { + if (timeout.isCancelled()) { + return; + } + logger.info("Ownership cache : {} streams cached, {} hosts cached", + stream2Addresses.size(), address2Streams.size()); + logger.info("Cached streams : {}", stream2Addresses); + scheduleDumpOwnershipCache(); + } + + public OwnershipStatsLogger getOwnershipStatsLogger() { + return ownershipStatsLogger; + } + + /** + * Update ownership of <i>stream</i> to <i>addr</i>. + * + * @param stream + * Stream Name. + * @param addr + * Owner Address. + * @return true if owner is updated + */ + public boolean updateOwner(String stream, SocketAddress addr) { + // update ownership + SocketAddress oldAddr = stream2Addresses.putIfAbsent(stream, addr); + if (null != oldAddr && oldAddr.equals(addr)) { + return true; + } + if (null != oldAddr) { + if (stream2Addresses.replace(stream, oldAddr, addr)) { + // Store the relevant mappings for this topic and host combination + logger.info("Storing ownership for stream : {}, old host : {}, new host : {}.", + new Object[] { stream, oldAddr, addr }); + StringBuilder sb = new StringBuilder(); + sb.append("Ownership changed '") + .append(oldAddr).append("' -> '").append(addr).append("'"); + removeOwnerFromStream(stream, oldAddr, sb.toString()); + + // update stats + ownershipStatsLogger.onRemove(stream); + ownershipStatsLogger.onAdd(stream); + } else { + logger.warn("Ownership of stream : {} has been changed from {} to {} when storing host : {}.", + new Object[] { stream, oldAddr, stream2Addresses.get(stream), addr }); + return false; + } + } else { + logger.info("Storing ownership for stream : {}, host : {}.", stream, addr); + // update stats + ownershipStatsLogger.onAdd(stream); + } + + Set<String> streamsForHost = address2Streams.get(addr); + if (null == streamsForHost) { + Set<String> newStreamsForHost = new HashSet<String>(); + streamsForHost = address2Streams.putIfAbsent(addr, newStreamsForHost); + if (null == streamsForHost) { + streamsForHost = newStreamsForHost; + } + } + synchronized (streamsForHost) { + // check whether the ownership changed, since it might happend after replace succeed + if (addr.equals(stream2Addresses.get(stream))) { + streamsForHost.add(stream); + } + } + return true; + } + + /** + * Get the cached owner for stream <code>stream</code>. + * + * @param stream + * stream to lookup ownership + * @return owner's address + */ + public SocketAddress getOwner(String stream) { + SocketAddress address = stream2Addresses.get(stream); + if (null == address) { + ownershipStatsLogger.onMiss(stream); + } else { + ownershipStatsLogger.onHit(stream); + } + return address; + } + + /** + * Remove the owner <code>addr</code> from <code>stream</code> for a given <code>reason</code>. + * + * @param stream stream name + * @param addr owner address + * @param reason reason to remove ownership + */ + public void removeOwnerFromStream(String stream, SocketAddress addr, String reason) { + if (stream2Addresses.remove(stream, addr)) { + logger.info("Removed stream to host mapping for (stream: {} -> host: {}) : reason = '{}'.", + new Object[] { stream, addr, reason }); + } + Set<String> streamsForHost = address2Streams.get(addr); + if (null != streamsForHost) { + synchronized (streamsForHost) { + if (streamsForHost.remove(stream)) { + logger.info("Removed stream ({}) from host {} : reason = '{}'.", + new Object[] { stream, addr, reason }); + if (streamsForHost.isEmpty()) { + address2Streams.remove(addr, streamsForHost); + } + ownershipStatsLogger.onRemove(stream); + } + } + } + } + + /** + * Remove all streams from host <code>addr</code>. + * + * @param addr + * host to remove ownerships + */ + public void removeAllStreamsFromOwner(SocketAddress addr) { + logger.info("Remove streams mapping for host {}", addr); + Set<String> streamsForHost = address2Streams.get(addr); + if (null != streamsForHost) { + synchronized (streamsForHost) { + for (String s : streamsForHost) { + if (stream2Addresses.remove(s, addr)) { + logger.info("Removing mapping for stream : {} from host : {}", s, addr); + ownershipStatsLogger.onRemove(s); + } + } + address2Streams.remove(addr, streamsForHost); + } + } + } + + /** + * Get the number cached streams. + * + * @return number cached streams. + */ + public int getNumCachedStreams() { + return stream2Addresses.size(); + } + + /** + * Get the stream ownership distribution across proxies. + * + * @return stream ownership distribution + */ + public Map<SocketAddress, Set<String>> getStreamOwnershipDistribution() { + return ImmutableMap.copyOf(address2Streams); + } + + /** + * Get the stream ownership mapping. + * + * @return stream ownership mapping. + */ + public Map<String, SocketAddress> getStreamOwnerMapping() { + return stream2Addresses; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/ownership/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/ownership/package-info.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/ownership/package-info.java new file mode 100644 index 0000000..486bd6f --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/ownership/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Utils for managing ownership at client side. + */ +package org.apache.distributedlog.client.ownership; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/package-info.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/package-info.java new file mode 100644 index 0000000..d22b0da --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * DistributedLog Client. + */ +package org.apache.distributedlog.client; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/ClusterClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/ClusterClient.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/ClusterClient.java new file mode 100644 index 0000000..9b5c7f6 --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/ClusterClient.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.proxy; + +import org.apache.distributedlog.thrift.service.DistributedLogService; +import com.twitter.finagle.Service; +import com.twitter.finagle.thrift.ThriftClientRequest; +import com.twitter.util.Future; +import scala.runtime.BoxedUnit; + +/** + * Cluster client. + */ +public class ClusterClient { + + private final Service<ThriftClientRequest, byte[]> client; + private final DistributedLogService.ServiceIface service; + + public ClusterClient(Service<ThriftClientRequest, byte[]> client, + DistributedLogService.ServiceIface service) { + this.client = client; + this.service = service; + } + + public Service<ThriftClientRequest, byte[]> getClient() { + return client; + } + + public DistributedLogService.ServiceIface getService() { + return service; + } + + public Future<BoxedUnit> close() { + return client.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/HostProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/HostProvider.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/HostProvider.java new file mode 100644 index 0000000..769cca8 --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/HostProvider.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.proxy; + +import java.net.SocketAddress; +import java.util.Set; + +/** + * Provider to provider list of hosts for handshaking. + */ +public interface HostProvider { + + /** + * Get the list of hosts for handshaking. + * + * @return list of hosts for handshaking. + */ + Set<SocketAddress> getHosts(); + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java new file mode 100644 index 0000000..6ef1d8e --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.proxy; + +import org.apache.distributedlog.client.ClientConfig; +import org.apache.distributedlog.client.stats.ClientStats; +import org.apache.distributedlog.thrift.service.DistributedLogService; +import com.twitter.finagle.Service; +import com.twitter.finagle.ThriftMux; +import com.twitter.finagle.builder.ClientBuilder; +import com.twitter.finagle.thrift.ClientId; +import com.twitter.finagle.thrift.ThriftClientFramedCodec; +import com.twitter.finagle.thrift.ThriftClientRequest; +import com.twitter.util.Duration; +import com.twitter.util.Future; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import org.apache.thrift.protocol.TBinaryProtocol; +import scala.Option; +import scala.runtime.BoxedUnit; + +/** + * Client talks to a single proxy. + */ +public class ProxyClient { + + /** + * Builder to build a proxy client talking to given host <code>address</code>. + */ + public interface Builder { + /** + * Build a proxy client to <code>address</code>. + * + * @param address + * proxy address + * @return proxy client + */ + ProxyClient build(SocketAddress address); + } + + public static Builder newBuilder(String clientName, + ClientId clientId, + ClientBuilder clientBuilder, + ClientConfig clientConfig, + ClientStats clientStats) { + return new DefaultBuilder(clientName, clientId, clientBuilder, clientConfig, clientStats); + } + + /** + * Default Builder for {@link ProxyClient}. + */ + public static class DefaultBuilder implements Builder { + + private final String clientName; + private final ClientId clientId; + private final ClientBuilder clientBuilder; + private final ClientStats clientStats; + + private DefaultBuilder(String clientName, + ClientId clientId, + ClientBuilder clientBuilder, + ClientConfig clientConfig, + ClientStats clientStats) { + this.clientName = clientName; + this.clientId = clientId; + this.clientStats = clientStats; + // client builder + ClientBuilder builder = setDefaultSettings( + null == clientBuilder ? getDefaultClientBuilder(clientConfig) : clientBuilder); + this.clientBuilder = configureThriftMux(builder, clientId, clientConfig); + } + + @SuppressWarnings("unchecked") + private ClientBuilder configureThriftMux(ClientBuilder builder, + ClientId clientId, + ClientConfig clientConfig) { + if (clientConfig.getThriftMux()) { + return builder.stack(ThriftMux.client().withClientId(clientId)); + } else { + return builder.codec(ThriftClientFramedCodec.apply(Option.apply(clientId))); + } + } + + private ClientBuilder getDefaultClientBuilder(ClientConfig clientConfig) { + ClientBuilder builder = ClientBuilder.get() + .tcpConnectTimeout(Duration.fromMilliseconds(200)) + .connectTimeout(Duration.fromMilliseconds(200)) + .requestTimeout(Duration.fromSeconds(1)); + if (!clientConfig.getThriftMux()) { + builder = builder.hostConnectionLimit(1); + } + return builder; + } + + @SuppressWarnings("unchecked") + private ClientBuilder setDefaultSettings(ClientBuilder builder) { + return builder.name(clientName) + .failFast(false) + .noFailureAccrual() + // disable retries on finagle client builder, as there is only one host per finagle client + // we should throw exception immediately on first failure, so DL client could quickly detect + // failures and retry other proxies. + .retries(1) + .keepAlive(true); + } + + @Override + @SuppressWarnings("unchecked") + public ProxyClient build(SocketAddress address) { + Service<ThriftClientRequest, byte[]> client = + ClientBuilder.safeBuildFactory( + clientBuilder + .hosts((InetSocketAddress) address) + .reportTo(clientStats.getFinagleStatsReceiver(address)) + ).toService(); + DistributedLogService.ServiceIface service = + new DistributedLogService.ServiceToClient(client, new TBinaryProtocol.Factory()); + return new ProxyClient(address, client, service); + } + + } + + private final SocketAddress address; + private final Service<ThriftClientRequest, byte[]> client; + private final DistributedLogService.ServiceIface service; + + protected ProxyClient(SocketAddress address, + Service<ThriftClientRequest, byte[]> client, + DistributedLogService.ServiceIface service) { + this.address = address; + this.client = client; + this.service = service; + } + + public SocketAddress getAddress() { + return address; + } + + public Service<ThriftClientRequest, byte[]> getClient() { + return client; + } + + public DistributedLogService.ServiceIface getService() { + return service; + } + + public Future<BoxedUnit> close() { + return client.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClientManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClientManager.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClientManager.java new file mode 100644 index 0000000..17b70be --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClientManager.java @@ -0,0 +1,362 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.proxy; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableMap; +import org.apache.distributedlog.client.ClientConfig; +import org.apache.distributedlog.client.stats.ClientStats; +import org.apache.distributedlog.client.stats.OpStats; +import org.apache.distributedlog.thrift.service.ClientInfo; +import org.apache.distributedlog.thrift.service.ServerInfo; +import com.twitter.util.FutureEventListener; +import java.net.SocketAddress; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.Timeout; +import org.jboss.netty.util.TimerTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manager manages clients (channels) to proxies. + */ +public class ProxyClientManager implements TimerTask { + + private static final Logger logger = LoggerFactory.getLogger(ProxyClientManager.class); + + private final ClientConfig clientConfig; + private final ProxyClient.Builder clientBuilder; + private final HashedWheelTimer timer; + private final HostProvider hostProvider; + private volatile Timeout periodicHandshakeTask; + private final ConcurrentHashMap<SocketAddress, ProxyClient> address2Services = + new ConcurrentHashMap<SocketAddress, ProxyClient>(); + private final CopyOnWriteArraySet<ProxyListener> proxyListeners = + new CopyOnWriteArraySet<ProxyListener>(); + private volatile boolean closed = false; + private volatile boolean periodicHandshakeEnabled = true; + private final Stopwatch lastOwnershipSyncStopwatch; + + private final OpStats handshakeStats; + + public ProxyClientManager(ClientConfig clientConfig, + ProxyClient.Builder clientBuilder, + HashedWheelTimer timer, + HostProvider hostProvider, + ClientStats clientStats) { + this.clientConfig = clientConfig; + this.clientBuilder = clientBuilder; + this.timer = timer; + this.hostProvider = hostProvider; + this.handshakeStats = clientStats.getOpStats("handshake"); + scheduleHandshake(); + this.lastOwnershipSyncStopwatch = Stopwatch.createStarted(); + } + + private void scheduleHandshake() { + if (clientConfig.getPeriodicHandshakeIntervalMs() > 0) { + periodicHandshakeTask = timer.newTimeout(this, + clientConfig.getPeriodicHandshakeIntervalMs(), TimeUnit.MILLISECONDS); + } + } + + void setPeriodicHandshakeEnabled(boolean enabled) { + this.periodicHandshakeEnabled = enabled; + } + + @Override + public void run(Timeout timeout) throws Exception { + if (timeout.isCancelled() || closed) { + return; + } + if (periodicHandshakeEnabled) { + final boolean syncOwnerships = lastOwnershipSyncStopwatch.elapsed(TimeUnit.MILLISECONDS) + >= clientConfig.getPeriodicOwnershipSyncIntervalMs(); + + final Set<SocketAddress> hostsSnapshot = hostProvider.getHosts(); + final AtomicInteger numHosts = new AtomicInteger(hostsSnapshot.size()); + final AtomicInteger numStreams = new AtomicInteger(0); + final AtomicInteger numSuccesses = new AtomicInteger(0); + final AtomicInteger numFailures = new AtomicInteger(0); + final ConcurrentMap<SocketAddress, Integer> streamDistributions = + new ConcurrentHashMap<SocketAddress, Integer>(); + final Stopwatch stopwatch = Stopwatch.createStarted(); + for (SocketAddress host : hostsSnapshot) { + final SocketAddress address = host; + final ProxyClient client = getClient(address); + handshake(address, client, new FutureEventListener<ServerInfo>() { + @Override + public void onSuccess(ServerInfo serverInfo) { + numStreams.addAndGet(serverInfo.getOwnershipsSize()); + numSuccesses.incrementAndGet(); + notifyHandshakeSuccess(address, client, serverInfo, false, stopwatch); + if (clientConfig.isHandshakeTracingEnabled()) { + streamDistributions.putIfAbsent(address, serverInfo.getOwnershipsSize()); + } + complete(); + } + + @Override + public void onFailure(Throwable cause) { + numFailures.incrementAndGet(); + notifyHandshakeFailure(address, client, cause, stopwatch); + complete(); + } + + private void complete() { + if (0 == numHosts.decrementAndGet()) { + if (syncOwnerships) { + logger.info("Periodic handshaked with {} hosts : {} streams returned," + + " {} hosts succeeded, {} hosts failed", + new Object[] { + hostsSnapshot.size(), + numStreams.get(), + numSuccesses.get(), + numFailures.get()}); + if (clientConfig.isHandshakeTracingEnabled()) { + logger.info("Periodic handshaked stream distribution : {}", streamDistributions); + } + } + } + } + }, false, syncOwnerships); + } + + if (syncOwnerships) { + lastOwnershipSyncStopwatch.reset().start(); + } + } + scheduleHandshake(); + } + + /** + * Register a proxy <code>listener</code> on proxy related changes. + * + * @param listener + * proxy listener + */ + public void registerProxyListener(ProxyListener listener) { + proxyListeners.add(listener); + } + + private void notifyHandshakeSuccess(SocketAddress address, + ProxyClient client, + ServerInfo serverInfo, + boolean logging, + Stopwatch stopwatch) { + if (logging) { + if (null != serverInfo && serverInfo.isSetOwnerships()) { + logger.info("Handshaked with {} : {} ownerships returned.", + address, serverInfo.getOwnerships().size()); + } else { + logger.info("Handshaked with {} : no ownerships returned", address); + } + } + handshakeStats.completeRequest(address, stopwatch.elapsed(TimeUnit.MICROSECONDS), 1); + for (ProxyListener listener : proxyListeners) { + listener.onHandshakeSuccess(address, client, serverInfo); + } + } + + private void notifyHandshakeFailure(SocketAddress address, + ProxyClient client, + Throwable cause, + Stopwatch stopwatch) { + handshakeStats.failRequest(address, stopwatch.elapsed(TimeUnit.MICROSECONDS), 1); + for (ProxyListener listener : proxyListeners) { + listener.onHandshakeFailure(address, client, cause); + } + } + + /** + * Retrieve a client to proxy <code>address</code>. + * + * @param address + * proxy address + * @return proxy client + */ + public ProxyClient getClient(final SocketAddress address) { + ProxyClient sc = address2Services.get(address); + if (null != sc) { + return sc; + } + return createClient(address); + } + + /** + * Remove the client to proxy <code>address</code>. + * + * @param address + * proxy address + */ + public void removeClient(SocketAddress address) { + ProxyClient sc = address2Services.remove(address); + if (null != sc) { + logger.info("Removed host {}.", address); + sc.close(); + } + } + + /** + * Remove the client <code>sc</code> to proxy <code>address</code>. + * + * @param address + * proxy address + * @param sc + * proxy client + */ + public void removeClient(SocketAddress address, ProxyClient sc) { + if (address2Services.remove(address, sc)) { + logger.info("Remove client {} to host {}.", sc, address); + sc.close(); + } + } + + /** + * Create a client to proxy <code>address</code>. + * + * @param address + * proxy address + * @return proxy client + */ + public ProxyClient createClient(final SocketAddress address) { + final ProxyClient sc = clientBuilder.build(address); + ProxyClient oldSC = address2Services.putIfAbsent(address, sc); + if (null != oldSC) { + sc.close(); + return oldSC; + } else { + final Stopwatch stopwatch = Stopwatch.createStarted(); + FutureEventListener<ServerInfo> listener = new FutureEventListener<ServerInfo>() { + @Override + public void onSuccess(ServerInfo serverInfo) { + notifyHandshakeSuccess(address, sc, serverInfo, true, stopwatch); + } + @Override + public void onFailure(Throwable cause) { + notifyHandshakeFailure(address, sc, cause, stopwatch); + } + }; + // send a ping messaging after creating connections. + handshake(address, sc, listener, true, true); + return sc; + } + } + + /** + * Handshake with a given proxy. + * + * @param address + * proxy address + * @param sc + * proxy client + * @param listener + * listener on handshake result + */ + private void handshake(SocketAddress address, + ProxyClient sc, + FutureEventListener<ServerInfo> listener, + boolean logging, + boolean getOwnerships) { + if (clientConfig.getHandshakeWithClientInfo()) { + ClientInfo clientInfo = new ClientInfo(); + clientInfo.setGetOwnerships(getOwnerships); + clientInfo.setStreamNameRegex(clientConfig.getStreamNameRegex()); + if (logging) { + logger.info("Handshaking with {} : {}", address, clientInfo); + } + sc.getService().handshakeWithClientInfo(clientInfo) + .addEventListener(listener); + } else { + if (logging) { + logger.info("Handshaking with {}", address); + } + sc.getService().handshake().addEventListener(listener); + } + } + + /** + * Handshake with all proxies. + * + * <p>NOTE: this is a synchronous call. + */ + public void handshake() { + Set<SocketAddress> hostsSnapshot = hostProvider.getHosts(); + logger.info("Handshaking with {} hosts.", hostsSnapshot.size()); + final CountDownLatch latch = new CountDownLatch(hostsSnapshot.size()); + final Stopwatch stopwatch = Stopwatch.createStarted(); + for (SocketAddress host: hostsSnapshot) { + final SocketAddress address = host; + final ProxyClient client = getClient(address); + handshake(address, client, new FutureEventListener<ServerInfo>() { + @Override + public void onSuccess(ServerInfo serverInfo) { + notifyHandshakeSuccess(address, client, serverInfo, true, stopwatch); + latch.countDown(); + } + @Override + public void onFailure(Throwable cause) { + notifyHandshakeFailure(address, client, cause, stopwatch); + latch.countDown(); + } + }, true, true); + } + try { + latch.await(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + logger.warn("Interrupted on handshaking with servers : ", e); + } + } + + /** + * Return number of proxies managed by client manager. + * + * @return number of proxies managed by client manager. + */ + public int getNumProxies() { + return address2Services.size(); + } + + /** + * Return all clients. + * + * @return all clients. + */ + public Map<SocketAddress, ProxyClient> getAllClients() { + return ImmutableMap.copyOf(address2Services); + } + + public void close() { + closed = true; + Timeout task = periodicHandshakeTask; + if (null != task) { + task.cancel(); + } + for (ProxyClient sc : address2Services.values()) { + sc.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyListener.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyListener.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyListener.java new file mode 100644 index 0000000..0a6b076 --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyListener.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.proxy; + +import org.apache.distributedlog.thrift.service.ServerInfo; +import java.net.SocketAddress; + +/** + * Listener on server changes. + */ +public interface ProxyListener { + /** + * When a proxy's server info changed, it would be notified. + * + * @param address + * proxy address + * @param client + * proxy client that executes handshaking + * @param serverInfo + * proxy's server info + */ + void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo); + + /** + * Failed to handshake with a proxy. + * + * @param address + * proxy address + * @param client + * proxy client + * @param cause + * failure reason + */ + void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/package-info.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/package-info.java new file mode 100644 index 0000000..4161afb --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/proxy/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Clients that interact with individual proxies. + */ +package org.apache.distributedlog.client.proxy; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/resolver/DefaultRegionResolver.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/resolver/DefaultRegionResolver.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/resolver/DefaultRegionResolver.java new file mode 100644 index 0000000..2ac5be3 --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/resolver/DefaultRegionResolver.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.resolver; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Default implementation of {@link RegionResolver}. + */ +public class DefaultRegionResolver implements RegionResolver { + + private static final String DEFAULT_REGION = "default-region"; + + private final Map<SocketAddress, String> regionOverrides = + new HashMap<SocketAddress, String>(); + private final ConcurrentMap<SocketAddress, String> regionMap = + new ConcurrentHashMap<SocketAddress, String>(); + + public DefaultRegionResolver() { + } + + public DefaultRegionResolver(Map<SocketAddress, String> regionOverrides) { + this.regionOverrides.putAll(regionOverrides); + } + + @Override + public String resolveRegion(SocketAddress address) { + String region = regionMap.get(address); + if (null == region) { + region = doResolveRegion(address); + regionMap.put(address, region); + } + return region; + } + + private String doResolveRegion(SocketAddress address) { + String region = regionOverrides.get(address); + if (null != region) { + return region; + } + + String domainName; + if (address instanceof InetSocketAddress) { + InetSocketAddress iAddr = (InetSocketAddress) address; + domainName = iAddr.getHostName(); + } else { + domainName = address.toString(); + } + String[] parts = domainName.split("\\."); + if (parts.length <= 0) { + return DEFAULT_REGION; + } + String hostName = parts[0]; + String[] labels = hostName.split("-"); + if (labels.length != 4) { + return DEFAULT_REGION; + } + return labels[0]; + } + + @Override + public void removeCachedHost(SocketAddress address) { + regionMap.remove(address); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/resolver/RegionResolver.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/resolver/RegionResolver.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/resolver/RegionResolver.java new file mode 100644 index 0000000..023799c --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/resolver/RegionResolver.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.resolver; + +import java.net.SocketAddress; + +/** + * Resolve address to region. + */ +public interface RegionResolver { + + /** + * Resolve address to region. + * + * @param address + * socket address + * @return region + */ + String resolveRegion(SocketAddress address); + + /** + * Remove cached host. + * + * @param address + * socket address. + */ + void removeCachedHost(SocketAddress address); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/resolver/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/resolver/package-info.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/resolver/package-info.java new file mode 100644 index 0000000..81cda2f --- /dev/null +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/resolver/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Resolver to resolve network addresses. + */ +package org.apache.distributedlog.client.resolver;