[DL][service] Create stream operation should not be submitted by StreamImpl


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/01a7306b
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/01a7306b
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/01a7306b

Branch: refs/heads/master
Commit: 01a7306b59ab5348b0df634f10fbd7c9434f37ad
Parents: 1feaeb4
Author: Sijie Guo <sij...@twitter.com>
Authored: Fri Dec 9 16:23:23 2016 -0800
Committer: Sijie Guo <sij...@twitter.com>
Committed: Tue Dec 27 16:49:30 2016 -0800

----------------------------------------------------------------------
 .../service/DistributedLogServiceImpl.java      |  18 +++-
 .../service/stream/AbstractStreamOp.java        |   6 +-
 .../distributedlog/service/stream/CreateOp.java |  53 ----------
 .../service/stream/admin/AdminOp.java           |  40 ++++++++
 .../service/stream/admin/CreateOp.java          |  54 ++++++++++
 .../service/stream/admin/StreamAdminOp.java     | 101 +++++++++++++++++++
 6 files changed, 212 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/01a7306b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
index 677ade5..5c5b5af 100644
--- 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
+++ 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
@@ -28,6 +28,7 @@ import 
com.twitter.distributedlog.client.resolver.DefaultRegionResolver;
 import com.twitter.distributedlog.client.resolver.RegionResolver;
 import com.twitter.distributedlog.client.routing.RoutingService;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
+import com.twitter.distributedlog.exceptions.DLException;
 import com.twitter.distributedlog.exceptions.RegionUnavailableException;
 import com.twitter.distributedlog.exceptions.ServiceUnavailableException;
 import com.twitter.distributedlog.exceptions.StreamUnavailableException;
@@ -38,8 +39,8 @@ import 
com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
 import com.twitter.distributedlog.service.config.ServerConfiguration;
 import com.twitter.distributedlog.service.config.StreamConfigProvider;
 import com.twitter.distributedlog.service.stream.BulkWriteOp;
-import com.twitter.distributedlog.service.stream.CreateOp;
 import com.twitter.distributedlog.service.stream.DeleteOp;
+import com.twitter.distributedlog.service.stream.admin.CreateOp;
 import com.twitter.distributedlog.service.stream.HeartbeatOp;
 import com.twitter.distributedlog.service.stream.ReleaseOp;
 import com.twitter.distributedlog.service.stream.Stream;
@@ -50,8 +51,9 @@ import 
com.twitter.distributedlog.service.stream.StreamManagerImpl;
 import com.twitter.distributedlog.service.stream.StreamOp;
 import com.twitter.distributedlog.service.stream.StreamOpStats;
 import com.twitter.distributedlog.service.stream.TruncateOp;
-import com.twitter.distributedlog.service.stream.WriteOpWithPayload;
 import com.twitter.distributedlog.service.stream.WriteOp;
+import com.twitter.distributedlog.service.stream.WriteOpWithPayload;
+import com.twitter.distributedlog.service.stream.admin.StreamAdminOp;
 import com.twitter.distributedlog.service.stream.limiter.ServiceRequestLimiter;
 import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
 import com.twitter.distributedlog.service.utils.ServerUtils;
@@ -485,8 +487,7 @@ public class DistributedLogServiceImpl implements 
DistributedLogService.ServiceI
     @Override
     public Future<WriteResponse> create(String stream, WriteContext ctx) {
         CreateOp op = new CreateOp(stream, statsLogger, streamManager, 
getChecksum(ctx), featureChecksumDisabled);
-        executeStreamOp(op);
-        return op.result();
+        return executeStreamAdminOp(op);
     }
 
     //
@@ -574,6 +575,15 @@ public class DistributedLogServiceImpl implements 
DistributedLogService.ServiceI
         return ctx.isSetCrc32() ? ctx.getCrc32() : null;
     }
 
+    private Future<WriteResponse> executeStreamAdminOp(final StreamAdminOp op) 
{
+        try {
+            op.preExecute();
+        } catch (DLException dle) {
+            return Future.exception(dle);
+        }
+        return op.execute();
+    }
+
     private void executeStreamOp(final StreamOp op) {
 
         // Must attach this as early as possible--returning before this point 
will cause us to

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/01a7306b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
index 82597f3..fbef587 100644
--- 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
+++ 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
@@ -158,15 +158,15 @@ public abstract class AbstractStreamOp<Response> 
implements StreamOp {
     // fail the result with the given response header
     protected abstract void fail(ResponseHeader header);
 
-    protected static OpStatsLogger requestStat(StatsLogger statsLogger, String 
opName) {
+    public static OpStatsLogger requestStat(StatsLogger statsLogger, String 
opName) {
         return requestLogger(statsLogger).getOpStatsLogger(opName);
     }
 
-    protected static StatsLogger requestLogger(StatsLogger statsLogger) {
+    public static StatsLogger requestLogger(StatsLogger statsLogger) {
         return statsLogger.scope("request");
     }
 
-    protected static StatsLogger requestScope(StatsLogger statsLogger, String 
scope) {
+    public static StatsLogger requestScope(StatsLogger statsLogger, String 
scope) {
         return requestLogger(statsLogger).scope(scope);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/01a7306b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/CreateOp.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/CreateOp.java
 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/CreateOp.java
deleted file mode 100644
index 96b8435..0000000
--- 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/CreateOp.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.service.stream;
-
-import com.twitter.distributedlog.AsyncLogWriter;
-import com.twitter.distributedlog.service.ResponseUtils;
-import com.twitter.distributedlog.thrift.service.WriteResponse;
-import com.twitter.distributedlog.util.Sequencer;
-import com.twitter.util.Future;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.stats.StatsLogger;
-import scala.runtime.AbstractFunction1;
-
-public class CreateOp extends AbstractWriteOp {
-  private final StreamManager streamManager;
-
-  public CreateOp(String stream,
-                  StatsLogger statsLogger,
-                  StreamManager streamManager,
-                  Long checksum,
-                  Feature checksumEnabledFeature) {
-    super(stream, requestStat(statsLogger, "create"), checksum, 
checksumEnabledFeature);
-    this.streamManager = streamManager;
-  }
-
-  @Override
-  protected Future<WriteResponse> executeOp(AsyncLogWriter writer,
-                                            Sequencer sequencer,
-                                            Object txnLock) {
-    Future<Void> result = streamManager.createStreamAsync(streamName());
-    return result.map(new AbstractFunction1<Void, WriteResponse>() {
-      @Override
-      public WriteResponse apply(Void value) {
-        return ResponseUtils.writeSuccess();
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/01a7306b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java
 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java
new file mode 100644
index 0000000..7ac4986
--- /dev/null
+++ 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.stream.admin;
+
+import com.twitter.distributedlog.exceptions.DLException;
+import com.twitter.util.Future;
+
+/**
+ * An admin operation
+ */
+public interface AdminOp<Response> {
+
+    /**
+     * Invoked before the stream op is executed.
+     */
+    void preExecute() throws DLException;
+
+    /**
+     * Execute the operation.
+     *
+     * @return the future represents the response of the operation
+     */
+    Future<Response> execute();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/01a7306b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java
 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java
new file mode 100644
index 0000000..2e1f490
--- /dev/null
+++ 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.stream.admin;
+
+import com.twitter.distributedlog.service.ResponseUtils;
+import com.twitter.distributedlog.service.stream.StreamManager;
+import com.twitter.distributedlog.thrift.service.WriteResponse;
+import com.twitter.util.Future;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.StatsLogger;
+import scala.runtime.AbstractFunction1;
+
+import static 
com.twitter.distributedlog.service.stream.AbstractStreamOp.requestStat;
+
+public class CreateOp extends StreamAdminOp {
+
+  public CreateOp(String stream,
+                  StatsLogger statsLogger,
+                  StreamManager streamManager,
+                  Long checksum,
+                  Feature checksumEnabledFeature) {
+    super(stream,
+            streamManager,
+            requestStat(statsLogger, "create"),
+            checksum,
+            checksumEnabledFeature);
+  }
+
+  @Override
+  protected Future<WriteResponse> executeOp() {
+    Future<Void> result = streamManager.createStreamAsync(stream);
+    return result.map(new AbstractFunction1<Void, WriteResponse>() {
+      @Override
+      public WriteResponse apply(Void value) {
+        return ResponseUtils.writeSuccess();
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/01a7306b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java
 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java
new file mode 100644
index 0000000..37c6e14
--- /dev/null
+++ 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.stream.admin;
+
+import com.google.common.base.Stopwatch;
+import com.twitter.distributedlog.exceptions.ChecksumFailedException;
+import com.twitter.distributedlog.exceptions.DLException;
+import com.twitter.distributedlog.service.ResponseUtils;
+import com.twitter.distributedlog.service.stream.StreamManager;
+import com.twitter.distributedlog.thrift.service.WriteResponse;
+import com.twitter.distributedlog.util.ProtocolUtils;
+import com.twitter.util.Future;
+import com.twitter.util.FutureTransformer;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Stream admin op
+ */
+public abstract class StreamAdminOp implements AdminOp<WriteResponse> {
+
+    protected final String stream;
+    protected final StreamManager streamManager;
+    protected final OpStatsLogger opStatsLogger;
+    protected final Stopwatch stopwatch = Stopwatch.createUnstarted();
+    protected final Long checksum;
+    protected final Feature checksumDisabledFeature;
+
+    protected StreamAdminOp(String stream,
+                            StreamManager streamManager,
+                            OpStatsLogger statsLogger,
+                            Long checksum,
+                            Feature checksumDisabledFeature) {
+        this.stream = stream;
+        this.streamManager = streamManager;
+        this.opStatsLogger = statsLogger;
+        // start here in case the operation is failed before executing.
+        stopwatch.reset().start();
+        this.checksum = checksum;
+        this.checksumDisabledFeature = checksumDisabledFeature;
+    }
+
+    protected Long computeChecksum() {
+        return ProtocolUtils.streamOpCRC32(stream);
+    }
+
+    @Override
+    public void preExecute() throws DLException {
+        if (!checksumDisabledFeature.isAvailable() && null != checksum) {
+            Long serverChecksum = computeChecksum();
+            if (null != serverChecksum && !checksum.equals(serverChecksum)) {
+                throw new ChecksumFailedException();
+            }
+        }
+    }
+
+    /**
+     * Execute the operation.
+     *
+     * @return execute operation
+     */
+    protected abstract Future<WriteResponse> executeOp();
+
+    @Override
+    public Future<WriteResponse> execute() {
+        return executeOp().transformedBy(new FutureTransformer<WriteResponse, 
WriteResponse>() {
+
+            @Override
+            public WriteResponse map(WriteResponse response) {
+                opStatsLogger.registerSuccessfulEvent(
+                        stopwatch.elapsed(TimeUnit.MICROSECONDS));
+                return response;
+            }
+
+            @Override
+            public WriteResponse handle(Throwable cause) {
+                opStatsLogger.registerFailedEvent(
+                        stopwatch.elapsed(TimeUnit.MICROSECONDS));
+                return 
ResponseUtils.write(ResponseUtils.exceptionToHeader(cause));
+            }
+
+        });
+    }
+}

Reply via email to