This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a commit to branch kyy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/kyy by this push:
     new 62d4b79  use read and write config
62d4b79 is described below

commit 62d4b79c498dd571e1c0f64a12ab563c46968caa
Author: Ring-k <[email protected]>
AuthorDate: Mon Jul 20 16:58:14 2020 +0800

    use read and write config
---
 .../cluster/client/sync/SyncClientAdaptor.java     | 63 ++++++++++++----------
 .../apache/iotdb/cluster/server/RaftServer.java    |  6 +++
 2 files changed, 41 insertions(+), 28 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index 42e2a2b..7456fc3 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -103,13 +103,14 @@ public class SyncClientAdaptor {
     return resultRef.get();
   }
 
-  public static Long querySingleSeriesByTimestamp(AsyncDataClient client, 
SingleSeriesQueryRequest request)
+  public static Long querySingleSeriesByTimestamp(AsyncDataClient client,
+      SingleSeriesQueryRequest request)
       throws TException, InterruptedException {
     AtomicReference<Long> result = new AtomicReference<>();
     GenericHandler<Long> handler = new GenericHandler<>(client.getNode(), 
result);
     synchronized (result) {
       client.querySingleSeriesByTimestamp(request, handler);
-      result.wait(RaftServer.getConnectionTimeoutInMS());
+      result.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return result.get();
   }
@@ -130,7 +131,7 @@ public class SyncClientAdaptor {
 
     synchronized (result) {
       client.querySingleSeries(request, handler);
-      result.wait(RaftServer.getConnectionTimeoutInMS());
+      result.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return result.get();
   }
@@ -143,7 +144,7 @@ public class SyncClientAdaptor {
     handler.setContact(client.getNode());
     synchronized (response) {
       client.getNodeList(header, schemaPattern, level, handler);
-      response.wait(RaftServer.getConnectionTimeoutInMS());
+      response.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return response.get();
   }
@@ -156,7 +157,7 @@ public class SyncClientAdaptor {
     handler.setContact(client.getNode());
     synchronized (response) {
       client.getChildNodePathInNextLevel(header, path, handler);
-      response.wait(RaftServer.getConnectionTimeoutInMS());
+      response.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return response.get();
   }
@@ -175,7 +176,7 @@ public class SyncClientAdaptor {
       plan.serialize(dataOutputStream);
       client.getAllMeasurementSchema(header, 
ByteBuffer.wrap(byteArrayOutputStream.toByteArray()),
           handler);
-      response.wait(RaftServer.getConnectionTimeoutInMS());
+      response.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return response.get();
   }
@@ -186,7 +187,7 @@ public class SyncClientAdaptor {
     GenericHandler<TNodeStatus> handler = new 
GenericHandler<>(client.getNode(), resultRef);
     synchronized (resultRef) {
       client.queryNodeStatus(handler);
-      resultRef.wait(RaftServer.getConnectionTimeoutInMS());
+      resultRef.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return resultRef.get();
   }
@@ -197,12 +198,13 @@ public class SyncClientAdaptor {
     GenericHandler<CheckStatusResponse> handler = new 
GenericHandler<>(client.getNode(), resultRef);
     synchronized (resultRef) {
       client.checkStatus(startUpStatus, handler);
-      resultRef.wait(RaftServer.getConnectionTimeoutInMS());
+      resultRef.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return resultRef.get();
   }
 
-  public static AddNodeResponse addNode(AsyncMetaClient client, Node thisNode, 
StartUpStatus startUpStatus)
+  public static AddNodeResponse addNode(AsyncMetaClient client, Node thisNode,
+      StartUpStatus startUpStatus)
       throws TException, InterruptedException {
     JoinClusterHandler handler = new JoinClusterHandler();
     AtomicReference<AddNodeResponse> response = new AtomicReference(null);
@@ -220,9 +222,10 @@ public class SyncClientAdaptor {
       PullSchemaRequest pullSchemaRequest) throws TException, 
InterruptedException {
     AtomicReference<List<MeasurementSchema>> timeseriesSchemas = new 
AtomicReference<>();
     synchronized (timeseriesSchemas) {
-      client.pullTimeSeriesSchema(pullSchemaRequest, new 
PullTimeseriesSchemaHandler(client.getNode(),
-          pullSchemaRequest.getPrefixPaths(), timeseriesSchemas));
-      timeseriesSchemas.wait(RaftServer.getConnectionTimeoutInMS());
+      client
+          .pullTimeSeriesSchema(pullSchemaRequest, new 
PullTimeseriesSchemaHandler(client.getNode(),
+              pullSchemaRequest.getPrefixPaths(), timeseriesSchemas));
+      timeseriesSchemas.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return timeseriesSchemas.get();
   }
@@ -230,32 +233,35 @@ public class SyncClientAdaptor {
   public static List<ByteBuffer> getAggrResult(AsyncDataClient client, 
GetAggrResultRequest request)
       throws TException, InterruptedException {
     AtomicReference<List<ByteBuffer>> resultReference = new 
AtomicReference<>();
-    GenericHandler<List<ByteBuffer>> handler = new 
GenericHandler<>(client.getNode(), resultReference);
+    GenericHandler<List<ByteBuffer>> handler = new 
GenericHandler<>(client.getNode(),
+        resultReference);
     synchronized (resultReference) {
       resultReference.set(null);
       client.getAggrResult(request, handler);
-      resultReference.wait(RaftServer.getConnectionTimeoutInMS());
+      resultReference.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return resultReference.get();
   }
 
-  public static List<String> getUnregisteredMeasurements(AsyncDataClient 
client, Node header, List<String> seriesPaths) throws TException, 
InterruptedException {
+  public static List<String> getUnregisteredMeasurements(AsyncDataClient 
client, Node header,
+      List<String> seriesPaths) throws TException, InterruptedException {
     AtomicReference<List<String>> remoteResult = new AtomicReference<>();
     GenericHandler<List<String>> handler = new 
GenericHandler<>(client.getNode(), remoteResult);
     synchronized (remoteResult) {
       client.getUnregisteredTimeseries(header, seriesPaths, handler);
-      remoteResult.wait(RaftServer.getConnectionTimeoutInMS());
+      remoteResult.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return remoteResult.get();
   }
 
-  public static List<String> getAllPaths(AsyncDataClient client, Node header, 
List<String> pathsToQuery)
+  public static List<String> getAllPaths(AsyncDataClient client, Node header,
+      List<String> pathsToQuery)
       throws InterruptedException, TException {
     AtomicReference<List<String>> remoteResult = new AtomicReference<>();
     GenericHandler<List<String>> handler = new 
GenericHandler<>(client.getNode(), remoteResult);
     synchronized (remoteResult) {
       client.getAllPaths(header, pathsToQuery, handler);
-      remoteResult.wait(RaftServer.getConnectionTimeoutInMS());
+      remoteResult.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return remoteResult.get();
   }
@@ -267,7 +273,7 @@ public class SyncClientAdaptor {
     GenericHandler<Integer> handler = new GenericHandler<>(client.getNode(), 
remoteResult);
     synchronized (remoteResult) {
       client.getPathCount(header, pathsToQuery, level, handler);
-      remoteResult.wait(RaftServer.getConnectionTimeoutInMS());
+      remoteResult.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return remoteResult.get();
   }
@@ -279,7 +285,7 @@ public class SyncClientAdaptor {
     GenericHandler<Set<String>> handler = new 
GenericHandler<>(client.getNode(), remoteResult);
     synchronized (remoteResult) {
       client.getAllDevices(header, pathsToQuery, handler);
-      remoteResult.wait(RaftServer.getConnectionTimeoutInMS());
+      remoteResult.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return remoteResult.get();
   }
@@ -291,7 +297,7 @@ public class SyncClientAdaptor {
     synchronized (result) {
       result.set(null);
       client.getGroupByExecutor(request, handler);
-      result.wait(RaftServer.getConnectionTimeoutInMS());
+      result.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return result.get();
   }
@@ -322,7 +328,7 @@ public class SyncClientAdaptor {
 
     synchronized (status) {
       client.executeNonQueryPlan(req, new ForwardPlanHandler(status, plan, 
receiver));
-      status.wait(RaftServer.getConnectionTimeoutInMS());
+      status.wait(RaftServer.getWriteOperationTimeoutMS());
     }
 
     return status.get();
@@ -335,19 +341,20 @@ public class SyncClientAdaptor {
     GenericHandler<ByteBuffer> handler = new 
GenericHandler<>(client.getNode(), result);
     synchronized (result) {
       client.readFile(remotePath, offset, fetchSize, handler);
-      result.wait(RaftServer.getConnectionTimeoutInMS());
+      result.wait(RaftServer.getWriteOperationTimeoutMS());
     }
     return result.get();
   }
 
-  public static List<ByteBuffer> getGroupByResult(AsyncDataClient client, Node 
header, long executorId
+  public static List<ByteBuffer> getGroupByResult(AsyncDataClient client, Node 
header,
+      long executorId
       , long curStartTime, long curEndTime) throws InterruptedException, 
TException {
     AtomicReference<List<ByteBuffer>> fetchResult = new AtomicReference<>();
     GenericHandler<List<ByteBuffer>> handler = new 
GenericHandler<>(client.getNode(), fetchResult);
     synchronized (fetchResult) {
       fetchResult.set(null);
       client.getGroupByResult(header, executorId, curStartTime, curEndTime, 
handler);
-      fetchResult.wait(RaftServer.getConnectionTimeoutInMS());
+      fetchResult.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return fetchResult.get();
   }
@@ -359,7 +366,7 @@ public class SyncClientAdaptor {
     synchronized (snapshotRef) {
       client.pullSnapshot(request, new PullSnapshotHandler<>(snapshotRef,
           client.getNode(), slots, factory));
-      snapshotRef.wait(RaftServer.getConnectionTimeoutInMS());
+      snapshotRef.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return snapshotRef.get();
   }
@@ -373,7 +380,7 @@ public class SyncClientAdaptor {
         context.getQueryId(), deviceMeasurements, header, client.getNode());
     synchronized (result) {
       client.last(request, handler);
-      result.wait(RaftServer.getConnectionTimeoutInMS());
+      result.wait(RaftServer.getReadOperationTimeoutMS());
     }
     return result.get();
   }
@@ -384,7 +391,7 @@ public class SyncClientAdaptor {
     GenericHandler<Boolean> handler = new GenericHandler<>(client.getNode(), 
result);
     synchronized (result) {
       client.onSnapshotApplied(header, slots, handler);
-      result.wait(RaftServer.getConnectionTimeoutInMS());
+      result.wait(RaftServer.getWriteOperationTimeoutMS());
     }
     return result.get();
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
index 42986b8..79f37ca 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
@@ -58,6 +58,8 @@ public abstract class RaftServer implements 
RaftService.AsyncIface, RaftService.
       ClusterDescriptor.getInstance().getConfig().getConnectionTimeoutInMS();
   private static int readOperationTimeoutMS =
       ClusterDescriptor.getInstance().getConfig().getReadOperationTimeoutMS();
+  private static int writeOperationTimeoutMS =
+      ClusterDescriptor.getInstance().getConfig().getWriteOperationTimeoutMS();
   private static int syncLeaderMaxWaitMs = 20 * 1000;
   private static long heartBeatIntervalMs = 1000L;
 
@@ -97,6 +99,10 @@ public abstract class RaftServer implements 
RaftService.AsyncIface, RaftService.
     return readOperationTimeoutMS;
   }
 
+  public static int getWriteOperationTimeoutMS() {
+    return writeOperationTimeoutMS;
+  }
+
   public static int getSyncLeaderMaxWaitMs() {
     return syncLeaderMaxWaitMs;
   }

Reply via email to