[DL][service] Create stream operation should not be submitted by StreamImpl
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/01a7306b Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/01a7306b Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/01a7306b Branch: refs/heads/master Commit: 01a7306b59ab5348b0df634f10fbd7c9434f37ad Parents: 1feaeb4 Author: Sijie Guo <sij...@twitter.com> Authored: Fri Dec 9 16:23:23 2016 -0800 Committer: Sijie Guo <sij...@twitter.com> Committed: Tue Dec 27 16:49:30 2016 -0800 ---------------------------------------------------------------------- .../service/DistributedLogServiceImpl.java | 18 +++- .../service/stream/AbstractStreamOp.java | 6 +- .../distributedlog/service/stream/CreateOp.java | 53 ---------- .../service/stream/admin/AdminOp.java | 40 ++++++++ .../service/stream/admin/CreateOp.java | 54 ++++++++++ .../service/stream/admin/StreamAdminOp.java | 101 +++++++++++++++++++ 6 files changed, 212 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/01a7306b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java index 677ade5..5c5b5af 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java @@ -28,6 +28,7 @@ import com.twitter.distributedlog.client.resolver.DefaultRegionResolver; import com.twitter.distributedlog.client.resolver.RegionResolver; import com.twitter.distributedlog.client.routing.RoutingService; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; +import com.twitter.distributedlog.exceptions.DLException; import com.twitter.distributedlog.exceptions.RegionUnavailableException; import com.twitter.distributedlog.exceptions.ServiceUnavailableException; import com.twitter.distributedlog.exceptions.StreamUnavailableException; @@ -38,8 +39,8 @@ import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; import com.twitter.distributedlog.service.config.ServerConfiguration; import com.twitter.distributedlog.service.config.StreamConfigProvider; import com.twitter.distributedlog.service.stream.BulkWriteOp; -import com.twitter.distributedlog.service.stream.CreateOp; import com.twitter.distributedlog.service.stream.DeleteOp; +import com.twitter.distributedlog.service.stream.admin.CreateOp; import com.twitter.distributedlog.service.stream.HeartbeatOp; import com.twitter.distributedlog.service.stream.ReleaseOp; import com.twitter.distributedlog.service.stream.Stream; @@ -50,8 +51,9 @@ import com.twitter.distributedlog.service.stream.StreamManagerImpl; import com.twitter.distributedlog.service.stream.StreamOp; import com.twitter.distributedlog.service.stream.StreamOpStats; import com.twitter.distributedlog.service.stream.TruncateOp; -import com.twitter.distributedlog.service.stream.WriteOpWithPayload; import com.twitter.distributedlog.service.stream.WriteOp; +import com.twitter.distributedlog.service.stream.WriteOpWithPayload; +import com.twitter.distributedlog.service.stream.admin.StreamAdminOp; import com.twitter.distributedlog.service.stream.limiter.ServiceRequestLimiter; import com.twitter.distributedlog.service.streamset.StreamPartitionConverter; import com.twitter.distributedlog.service.utils.ServerUtils; @@ -485,8 +487,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI @Override public Future<WriteResponse> create(String stream, WriteContext ctx) { CreateOp op = new CreateOp(stream, statsLogger, streamManager, getChecksum(ctx), featureChecksumDisabled); - executeStreamOp(op); - return op.result(); + return executeStreamAdminOp(op); } // @@ -574,6 +575,15 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI return ctx.isSetCrc32() ? ctx.getCrc32() : null; } + private Future<WriteResponse> executeStreamAdminOp(final StreamAdminOp op) { + try { + op.preExecute(); + } catch (DLException dle) { + return Future.exception(dle); + } + return op.execute(); + } + private void executeStreamOp(final StreamOp op) { // Must attach this as early as possible--returning before this point will cause us to http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/01a7306b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java index 82597f3..fbef587 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java @@ -158,15 +158,15 @@ public abstract class AbstractStreamOp<Response> implements StreamOp { // fail the result with the given response header protected abstract void fail(ResponseHeader header); - protected static OpStatsLogger requestStat(StatsLogger statsLogger, String opName) { + public static OpStatsLogger requestStat(StatsLogger statsLogger, String opName) { return requestLogger(statsLogger).getOpStatsLogger(opName); } - protected static StatsLogger requestLogger(StatsLogger statsLogger) { + public static StatsLogger requestLogger(StatsLogger statsLogger) { return statsLogger.scope("request"); } - protected static StatsLogger requestScope(StatsLogger statsLogger, String scope) { + public static StatsLogger requestScope(StatsLogger statsLogger, String scope) { return requestLogger(statsLogger).scope(scope); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/01a7306b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/CreateOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/CreateOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/CreateOp.java deleted file mode 100644 index 96b8435..0000000 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/CreateOp.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.service.stream; - -import com.twitter.distributedlog.AsyncLogWriter; -import com.twitter.distributedlog.service.ResponseUtils; -import com.twitter.distributedlog.thrift.service.WriteResponse; -import com.twitter.distributedlog.util.Sequencer; -import com.twitter.util.Future; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.stats.StatsLogger; -import scala.runtime.AbstractFunction1; - -public class CreateOp extends AbstractWriteOp { - private final StreamManager streamManager; - - public CreateOp(String stream, - StatsLogger statsLogger, - StreamManager streamManager, - Long checksum, - Feature checksumEnabledFeature) { - super(stream, requestStat(statsLogger, "create"), checksum, checksumEnabledFeature); - this.streamManager = streamManager; - } - - @Override - protected Future<WriteResponse> executeOp(AsyncLogWriter writer, - Sequencer sequencer, - Object txnLock) { - Future<Void> result = streamManager.createStreamAsync(streamName()); - return result.map(new AbstractFunction1<Void, WriteResponse>() { - @Override - public WriteResponse apply(Void value) { - return ResponseUtils.writeSuccess(); - } - }); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/01a7306b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java new file mode 100644 index 0000000..7ac4986 --- /dev/null +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.service.stream.admin; + +import com.twitter.distributedlog.exceptions.DLException; +import com.twitter.util.Future; + +/** + * An admin operation + */ +public interface AdminOp<Response> { + + /** + * Invoked before the stream op is executed. + */ + void preExecute() throws DLException; + + /** + * Execute the operation. + * + * @return the future represents the response of the operation + */ + Future<Response> execute(); + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/01a7306b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java new file mode 100644 index 0000000..2e1f490 --- /dev/null +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.service.stream.admin; + +import com.twitter.distributedlog.service.ResponseUtils; +import com.twitter.distributedlog.service.stream.StreamManager; +import com.twitter.distributedlog.thrift.service.WriteResponse; +import com.twitter.util.Future; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.stats.StatsLogger; +import scala.runtime.AbstractFunction1; + +import static com.twitter.distributedlog.service.stream.AbstractStreamOp.requestStat; + +public class CreateOp extends StreamAdminOp { + + public CreateOp(String stream, + StatsLogger statsLogger, + StreamManager streamManager, + Long checksum, + Feature checksumEnabledFeature) { + super(stream, + streamManager, + requestStat(statsLogger, "create"), + checksum, + checksumEnabledFeature); + } + + @Override + protected Future<WriteResponse> executeOp() { + Future<Void> result = streamManager.createStreamAsync(stream); + return result.map(new AbstractFunction1<Void, WriteResponse>() { + @Override + public WriteResponse apply(Void value) { + return ResponseUtils.writeSuccess(); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/01a7306b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java new file mode 100644 index 0000000..37c6e14 --- /dev/null +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.service.stream.admin; + +import com.google.common.base.Stopwatch; +import com.twitter.distributedlog.exceptions.ChecksumFailedException; +import com.twitter.distributedlog.exceptions.DLException; +import com.twitter.distributedlog.service.ResponseUtils; +import com.twitter.distributedlog.service.stream.StreamManager; +import com.twitter.distributedlog.thrift.service.WriteResponse; +import com.twitter.distributedlog.util.ProtocolUtils; +import com.twitter.util.Future; +import com.twitter.util.FutureTransformer; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.stats.OpStatsLogger; + +import java.util.concurrent.TimeUnit; + +/** + * Stream admin op + */ +public abstract class StreamAdminOp implements AdminOp<WriteResponse> { + + protected final String stream; + protected final StreamManager streamManager; + protected final OpStatsLogger opStatsLogger; + protected final Stopwatch stopwatch = Stopwatch.createUnstarted(); + protected final Long checksum; + protected final Feature checksumDisabledFeature; + + protected StreamAdminOp(String stream, + StreamManager streamManager, + OpStatsLogger statsLogger, + Long checksum, + Feature checksumDisabledFeature) { + this.stream = stream; + this.streamManager = streamManager; + this.opStatsLogger = statsLogger; + // start here in case the operation is failed before executing. + stopwatch.reset().start(); + this.checksum = checksum; + this.checksumDisabledFeature = checksumDisabledFeature; + } + + protected Long computeChecksum() { + return ProtocolUtils.streamOpCRC32(stream); + } + + @Override + public void preExecute() throws DLException { + if (!checksumDisabledFeature.isAvailable() && null != checksum) { + Long serverChecksum = computeChecksum(); + if (null != serverChecksum && !checksum.equals(serverChecksum)) { + throw new ChecksumFailedException(); + } + } + } + + /** + * Execute the operation. + * + * @return execute operation + */ + protected abstract Future<WriteResponse> executeOp(); + + @Override + public Future<WriteResponse> execute() { + return executeOp().transformedBy(new FutureTransformer<WriteResponse, WriteResponse>() { + + @Override + public WriteResponse map(WriteResponse response) { + opStatsLogger.registerSuccessfulEvent( + stopwatch.elapsed(TimeUnit.MICROSECONDS)); + return response; + } + + @Override + public WriteResponse handle(Throwable cause) { + opStatsLogger.registerFailedEvent( + stopwatch.elapsed(TimeUnit.MICROSECONDS)); + return ResponseUtils.write(ResponseUtils.exceptionToHeader(cause)); + } + + }); + } +}