This is an automated email from the ASF dual-hosted git repository. yuyuankang pushed a commit to branch kyy in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit e4f19e52c55ba2171ed939ce2b40da13c5c299fa Author: Ring-k <[email protected]> AuthorDate: Thu Jun 18 15:50:29 2020 +0800 auto schema creation --- .../cluster/client/sync/SyncClientAdaptor.java | 10 ++ .../iotdb/cluster/query/ClusterPlanExecutor.java | 21 ++-- .../iotdb/cluster/server/DataClusterServer.java | 7 ++ .../cluster/server/member/DataGroupMember.java | 37 ++++-- .../cluster/server/member/MetaGroupMember.java | 132 ++++++++++++++++++++- .../iotdb/cluster/server/member/RaftMember.java | 2 +- service-rpc/src/main/thrift/cluster.thrift | 2 + 7 files changed, 188 insertions(+), 23 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java index be44fc3..a00fbc1 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java @@ -242,6 +242,16 @@ public class SyncClientAdaptor { return resultReference.get(); } + public static Map<String, Boolean> getUnregisteredMeasurements(DataClient client, Node header, List<String> seriesPaths) throws TException, InterruptedException { + AtomicReference<Map<String, Boolean>> remoteResult = new AtomicReference<>(); + GenericHandler<Map<String, Boolean>> handler = new GenericHandler<>(client.getNode(), remoteResult); + synchronized (remoteResult) { + client.isMeasurementsRegistered(header, seriesPaths, handler); + remoteResult.wait(RaftServer.getConnectionTimeoutInMS()); + } + return remoteResult.get(); + } + public static List<String> getAllPaths(DataClient client, Node header, List<String> pathsToQuery) throws InterruptedException, TException { AtomicReference<List<String>> remoteResult = new AtomicReference<>(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java index 94b3693..8376fe2 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java @@ -97,8 +97,8 @@ public class ClusterPlanExecutor extends PlanExecutor { @Override public QueryDataSet processQuery(PhysicalPlan queryPlan, QueryContext context) - throws IOException, StorageEngineException, QueryFilterOptimizationException, QueryProcessException, - MetadataException { + throws IOException, StorageEngineException, QueryFilterOptimizationException, QueryProcessException, + MetadataException { if (queryPlan instanceof QueryPlan) { logger.debug("Executing a query: {}", queryPlan); return processDataQuery((QueryPlan) queryPlan, context); @@ -155,7 +155,7 @@ public class ClusterPlanExecutor extends PlanExecutor { * * @param sgPathMap the key is the storage group name and the value is the path to be queried with * storage group added - * @param level the max depth to match the pattern, -1 means matching the whole pattern + * @param level the max depth to match the pattern, -1 means matching the whole pattern * @return the number of paths that match the pattern at given level * @throws MetadataException */ @@ -168,7 +168,8 @@ public class ClusterPlanExecutor extends PlanExecutor { String storageGroupName = sgPathEntry.getKey(); String pathUnderSG = sgPathEntry.getValue(); // find the data group that should hold the timeseries schemas of the storage group - PartitionGroup partitionGroup = metaGroupMember.getPartitionTable().route(storageGroupName, 0); + PartitionGroup partitionGroup = metaGroupMember.getPartitionTable() + .route(storageGroupName, 0); if (partitionGroup.contains(metaGroupMember.getThisNode())) { // this node is a member of the group, perform a local query after synchronizing with the // leader @@ -234,7 +235,8 @@ public class ClusterPlanExecutor extends PlanExecutor { return localResult; } - private int getRemotePathCount(PartitionGroup partitionGroup, List<String> pathsToQuery, int level) + private int getRemotePathCount(PartitionGroup partitionGroup, List<String> pathsToQuery, + int level) throws MetadataException { // choose the node with lowest latency or highest throughput List<Node> coordinatedNodes = QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup); @@ -308,7 +310,8 @@ public class ClusterPlanExecutor extends PlanExecutor { return MManager.getInstance().getNodesList(schemaPattern, level, new SlotSgFilter(metaGroupMember.getPartitionTable().getNodeSlots(header))); } catch (MetadataException e) { - logger.error("Cannot not get node list of {}@{} from {} locally", schemaPattern, level, group); + logger + .error("Cannot not get node list of {}@{} from {} locally", schemaPattern, level, group); return Collections.emptyList(); } } @@ -570,7 +573,7 @@ public class ClusterPlanExecutor extends PlanExecutor { @Override protected AlignByDeviceDataSet getAlignByDeviceDataSet(AlignByDevicePlan plan, - QueryContext context, IQueryRouter router) + QueryContext context, IQueryRouter router) throws MetadataException { return new ClusterAlignByDeviceDataSet(plan, context, router, metaGroupMember); } @@ -588,8 +591,8 @@ public class ClusterPlanExecutor extends PlanExecutor { break; default: throw new QueryProcessException(String - .format("Unrecognized load configuration plan type: %s", - plan.getLoadConfigurationPlanType())); + .format("Unrecognized load configuration plan type: %s", + plan.getLoadConfigurationPlanType())); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java index 705ecb4..4876e2f 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java @@ -384,6 +384,13 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async } @Override + public void isMeasurementsRegistered(Node header, List<String> measurements, AsyncMethodCallback<Map<String, Boolean>> resultHandler) throws TException { + DataGroupMember dataMember = getDataMember(header, resultHandler, + "Check if measurements are registered"); + dataMember.isMeasurementsRegistered(header,measurements, resultHandler); + } + + @Override public void getGroupByExecutor(GroupByRequest request, AsyncMethodCallback<Long> resultHandler) { DataGroupMember dataMember = getDataMember(request.getHeader(), resultHandler, request); dataMember.getGroupByExecutor(request, resultHandler); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java index a2a174b..91e92f2 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java @@ -98,7 +98,9 @@ import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.MManager; +import org.apache.iotdb.db.qp.executor.IPlanExecutor; import org.apache.iotdb.db.qp.physical.PhysicalPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.AggregationType; @@ -120,6 +122,7 @@ import org.apache.iotdb.db.utils.FilePathUtils; import org.apache.iotdb.db.utils.SchemaUtils; import org.apache.iotdb.db.utils.SerializeUtils; import org.apache.iotdb.db.utils.TestOnly; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.TimeValuePair; @@ -998,7 +1001,6 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf return status; } } - return forwardPlan(plan, leader, getHeader()); } @@ -1062,8 +1064,8 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf /** * Create an IPointReader of "path" with “timeFilter” and "valueFilter". A synchronization with - * the leader will be performed first to preserve strong consistency. - * TODO-Cluster: also support weak consistency + * the leader will be performed first to preserve strong consistency. TODO-Cluster: also support + * weak consistency * * @param path * @param dataType @@ -1089,8 +1091,8 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf /** * Create an IBatchReader of "path" with “timeFilter” and "valueFilter". A synchronization with - * the leader will be performed first to preserve strong consistency. - * TODO-Cluster: also support weak consistency + * the leader will be performed first to preserve strong consistency. TODO-Cluster: also support + * weak consistency * * @param path * @param dataType @@ -1144,8 +1146,7 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf /** * Create an IReaderByTimestamp of "path". A synchronization with the leader will be performed - * first to preserve strong consistency. - * TODO-Cluster: also support weak consistency + * first to preserve strong consistency. TODO-Cluster: also support weak consistency * * @param path * @param dataType @@ -1583,6 +1584,28 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf resultHandler.onComplete(resultBuffers); } + @Override + public void isMeasurementsRegistered(Node header, List<String> measurements, + AsyncMethodCallback<Map<String, Boolean>> resultHandler) throws TException { + if (!syncLeader()) { + resultHandler.onError(new LeaderUnknownException(getAllNodes())); + return; + } + Map<String, Boolean> result = new HashMap<>(); + for (String seriesPath : measurements) { + try { + List<String> path = MManager.getInstance().getAllTimeseriesName(seriesPath); + if (path.size() != 1) { + throw new MetadataException("Size of the path is not 1."); + } + result.put(seriesPath, true); + } catch (MetadataException e) { + result.put(seriesPath, false); + } + } + resultHandler.onComplete(result); + } + /** * Execute "aggregation" over "path" with "timeFilter". This method currently requires strong * consistency. Only data managed by this group will be used for aggregation. diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index 8edd6ac..ffd0c6c 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@ -126,9 +126,11 @@ import org.apache.iotdb.cluster.server.member.DataGroupMember.Factory; import org.apache.iotdb.cluster.utils.PartitionUtils; import org.apache.iotdb.cluster.utils.PartitionUtils.Intervals; import org.apache.iotdb.cluster.utils.StatusUtils; +import org.apache.iotdb.cluster.utils.nodetool.function.Partition; import org.apache.iotdb.db.auth.AuthException; import org.apache.iotdb.db.auth.authorizer.IAuthorizer; import org.apache.iotdb.db.auth.authorizer.LocalFileAuthorizer; +import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.StorageEngine; @@ -139,9 +141,13 @@ import org.apache.iotdb.db.exception.metadata.PathNotExistException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.MManager; +import org.apache.iotdb.db.metadata.MetaUtils; import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode; import org.apache.iotdb.db.qp.executor.PlanExecutor; import org.apache.iotdb.db.qp.physical.PhysicalPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; +import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.context.QueryContext; @@ -152,15 +158,22 @@ import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader; import org.apache.iotdb.db.utils.SchemaUtils; import org.apache.iotdb.db.utils.SerializeUtils; import org.apache.iotdb.db.utils.TestOnly; +import org.apache.iotdb.db.utils.TypeInferenceUtils; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TSStatus; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.common.constant.TsFileConstant; +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.reader.IPointReader; import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.utils.StringContainer; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.thrift.TException; import org.apache.thrift.async.AsyncMethodCallback; @@ -1520,8 +1533,27 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf } catch (MetadataException e) { logger.error("Cannot route plan {}", plan, e); } - // the storage group is not found locally, forward it to the leader + // the storage group is not found locally if (planGroupMap == null || planGroupMap.isEmpty()) { + if (plan instanceof InsertPlan && IoTDBDescriptor.getInstance().getConfig() + .isAutoCreateSchemaEnabled()) { + System.out.println("try to set storage group"); + String deviceId = ((InsertPlan) plan).getDeviceId(); + try { + String storageGroupName = MetaUtils + .getStorageGroupNameByLevel(deviceId, IoTDBDescriptor.getInstance() + .getConfig().getDefaultStorageGroupLevel()); + SetStorageGroupPlan setStorageGroupPlan = new SetStorageGroupPlan( + new Path(storageGroupName)); + TSStatus setStorageGroupResult = executeNonQuery(setStorageGroupPlan); + if (setStorageGroupResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new MetadataException("Failed to set storage group " + storageGroupName); + } + return executeNonQuery(plan); + } catch (MetadataException e) { + logger.info("Failed to set storage group of device id {}", deviceId); + } + } logger.debug("{}: Cannot found storage groups for {}", name, plan); return StatusUtils.NO_STORAGE_GROUP; } @@ -1551,6 +1583,19 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf subStatus = forwardPlan(entry.getKey(), entry.getValue()); } if (subStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + if (entry.getKey() instanceof InsertPlan + && subStatus.getCode() == TSStatusCode.STORAGE_ENGINE_ERROR.getStatusCode() + && IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) { + boolean hasCreate = autoCreateTimeseries((InsertPlan) entry.getKey(), entry.getValue()); + if (hasCreate) { + Map<PhysicalPlan, PartitionGroup> subPlan = new HashMap<>(); + subPlan.put(entry.getKey(), entry.getValue()); + subStatus = forwardPlan(subPlan); + continue; + } else { + logger.error("{}, Cannot auto create timeseries.", thisNode); + } + } // execution failed, record the error message errorCodePartitionGroups.add(String.format("[%s@%s:%s]", subStatus.getCode(), entry.getValue().getHeader(), @@ -1569,6 +1614,77 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf return status; } + boolean autoCreateTimeseries(InsertPlan insertPlan, PartitionGroup partitionGroup) { + List<String> seriesList = new ArrayList<>(); + String deviceId = insertPlan.getDeviceId(); + for (String measurementId : insertPlan.getMeasurements()) { + seriesList.add( + new StringContainer(new String[]{deviceId, measurementId}, TsFileConstant.PATH_SEPARATOR) + .toString()); + } + List<String> unregistedSeriesList = getUnregisteredSeriesList(seriesList, partitionGroup); + for (String seriesPath : unregistedSeriesList) { + int index = seriesList.indexOf(seriesPath); + TSDataType dataType = TypeInferenceUtils + .getPredictedDataType(insertPlan.getValues()[index], true); + TSEncoding encoding = getDefaultEncoding(dataType); + CompressionType compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor(); + CreateTimeSeriesPlan createTimeSeriesPlan = new CreateTimeSeriesPlan(new Path(seriesPath), + dataType, encoding, compressionType, null, null, null, null); + TSStatus result = executeNonQuery(createTimeSeriesPlan); + if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + logger.error("{} failed to execute create timeseries {}", thisNode, seriesPath); + return false; + } + } + return true; + } + + private TSEncoding getDefaultEncoding(TSDataType dataType) { + IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig(); + switch (dataType) { + case BOOLEAN: + return conf.getDefaultBooleanEncoding(); + case INT32: + return conf.getDefaultInt32Encoding(); + case INT64: + return conf.getDefaultInt64Encoding(); + case FLOAT: + return conf.getDefaultFloatEncoding(); + case DOUBLE: + return conf.getDefaultDoubleEncoding(); + case TEXT: + return conf.getDefaultTextEncoding(); + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", dataType.toString())); + } + } + + List<String> getUnregisteredSeriesList(List<String> seriesList, PartitionGroup partitionGroup) { + Set<String> unregistered = new HashSet<>(); + for (Node node : partitionGroup) { + try { + DataClient client = getDataClient(node); + Map<String, Boolean> result = SyncClientAdaptor + .getUnregisteredMeasurements(client, partitionGroup.getHeader(), seriesList); + for (Map.Entry<String, Boolean> entry : result.entrySet()) { + if (!entry.getValue()) { + unregistered.add(entry.getKey()); + } + } + } catch (TException | IOException e) { + logger.error("{}: cannot getting unregistered series list {} from {}", name, + Arrays.toString(seriesList.toArray(new String[0])), node, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("{}: getting unregistered series list {} is interrupted from {}", name, + Arrays.toString(seriesList.toArray(new String[0])), node, e); + } + } + return new ArrayList<String>(unregistered); + } + /** * Forward a plan to the DataGroupMember of one node in the group. Only when all nodes time out, * will a TIME_OUT be returned. @@ -1700,7 +1816,8 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf if (logger.isDebugEnabled()) { logger.debug("{}: Pulled {} timeseries schemas of {} and other {} paths from {} of {}", name, - schemas.size(), prefixPaths.get(0), prefixPaths.size() - 1, node, partitionGroup.getHeader()); + schemas.size(), prefixPaths.get(0), prefixPaths.size() - 1, node, + partitionGroup.getHeader()); } results.addAll(schemas); break; @@ -1718,7 +1835,8 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf * @return * @throws MetadataException */ - public Pair<List<TSDataType>, List<TSDataType>> getSeriesTypesByPath(List<Path> paths, List<String> aggregations) throws + public Pair<List<TSDataType>, List<TSDataType>> getSeriesTypesByPath(List<Path> paths, + List<String> aggregations) throws MetadataException { try { // try locally first @@ -1761,7 +1879,7 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf MeasurementSchema schema = schemas.get(i); columnType.add(schema.getType()); MManager.getInstance().cacheSchema(paths.get(i).getDevice() + - IoTDBConstant.PATH_SEPARATOR + schema.getMeasurementId(), schema); + IoTDBConstant.PATH_SEPARATOR + schema.getMeasurementId(), schema); } else { columnType.add(dataType); } @@ -1781,7 +1899,8 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf * @return * @throws MetadataException */ - public Pair<List<TSDataType>, List<TSDataType>> getSeriesTypesByString(List<String> pathStrs, String aggregation) throws + public Pair<List<TSDataType>, List<TSDataType>> getSeriesTypesByString(List<String> pathStrs, + String aggregation) throws MetadataException { try { // try locally first @@ -1793,7 +1912,8 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf } else { // if the aggregation function is not null, // we should recalculate the type of column in result set - List<TSDataType> columnDataTypes = SchemaUtils.getSeriesTypesByString(pathStrs, aggregation); + List<TSDataType> columnDataTypes = SchemaUtils + .getSeriesTypesByString(pathStrs, aggregation); return new Pair<>(columnDataTypes, measurementDataTypes); } } catch (PathNotExistException e) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index 5fd6189..14ec490 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java @@ -78,6 +78,7 @@ import org.apache.iotdb.db.exception.metadata.PathNotExistException; import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; import org.apache.iotdb.db.qp.physical.PhysicalPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.thrift.TException; @@ -980,7 +981,6 @@ public abstract class RaftMember implements RaftService.AsyncIface { if (readOnly) { return StatusUtils.NODE_READ_ONLY; } - PhysicalPlanLog log = new PhysicalPlanLog(); // assign term and index to the new log and append it synchronized (logManager) { diff --git a/service-rpc/src/main/thrift/cluster.thrift b/service-rpc/src/main/thrift/cluster.thrift index b8b7daf..39d59ae 100644 --- a/service-rpc/src/main/thrift/cluster.thrift +++ b/service-rpc/src/main/thrift/cluster.thrift @@ -350,6 +350,8 @@ service TSDataService extends RaftService { list<binary> getAggrResult(1:GetAggrResultRequest request) + map<string, bool> isMeasurementsRegistered(1: Node header, 2: list<string> measurements) + PullSnapshotResp pullSnapshot(1:PullSnapshotRequest request) /**
