This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch refactor_overflow
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/refactor_overflow by this push:
     new 738a6d6  temporary commit
738a6d6 is described below

commit 738a6d6a6628711cce1f778caf9b7b79cffea45b
Author: xiangdong huang <[email protected]>
AuthorDate: Thu Apr 25 08:18:07 2019 +0800

    temporary commit
---
 .../db/engine/overflowdata/OverflowProcessor.java  |  11 +
 .../db/engine/sgmanager/StorageGroupProcessor.java | 494 +++++++++++----------
 2 files changed, 259 insertions(+), 246 deletions(-)

diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java
index 6f460a6..6c31247 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java
@@ -20,14 +20,24 @@
 package org.apache.iotdb.db.engine.overflowdata;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import org.apache.iotdb.db.conf.directories.Directories;
 import org.apache.iotdb.db.engine.bufferwrite.Action;
+import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
+import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
+import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.sgmanager.OperationResult;
 import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.BufferWriteProcessorException;
 import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.FileSchema;
 
 public class OverflowProcessor extends TsFileProcessor {
@@ -69,4 +79,5 @@ public class OverflowProcessor extends TsFileProcessor {
     return null;
   }
 
+
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
index 4cc0d6a..70d2ec0 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
@@ -1,246 +1,248 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing,
-// * software distributed under the License is distributed on an
-// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// * KIND, either express or implied.  See the License for the
-// * specific language governing permissions and limitations
-// * under the License.
-// */
-//
-//package org.apache.iotdb.db.engine.sgmanager;
-//
-//import java.io.File;
-//import java.io.FileInputStream;
-//import java.io.IOException;
-//import java.util.ArrayList;
-//import java.util.HashMap;
-//import java.util.List;
-//import java.util.Map;
-//import java.util.concurrent.Future;
-//import java.util.concurrent.atomic.AtomicLong;
-//import org.apache.iotdb.db.conf.IoTDBConstant;
-//import org.apache.iotdb.db.conf.IoTDBDescriptor;
-//import org.apache.iotdb.db.engine.Processor;
-//import org.apache.iotdb.db.engine.bufferwrite.Action;
-//import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStatus;
-//import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStore;
-//import org.apache.iotdb.db.engine.filenode.TsFileResource;
-//import org.apache.iotdb.db.engine.modification.Modification;
-//import org.apache.iotdb.db.engine.overflowdata.OverflowProcessor;
-//import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
-//import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
-//import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-//import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-//import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile;
-//import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
-//import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
-//import org.apache.iotdb.db.engine.version.VersionController;
-//import org.apache.iotdb.db.exception.BufferWriteProcessorException;
-//import org.apache.iotdb.db.exception.FileNodeManagerException;
-//import org.apache.iotdb.db.exception.FileNodeProcessorException;
-//import org.apache.iotdb.db.exception.ProcessorException;
-//import org.apache.iotdb.db.metadata.MManager;
-//import org.apache.iotdb.db.monitor.IStatistic;
-//import org.apache.iotdb.db.monitor.MonitorConstants;
-//import org.apache.iotdb.db.monitor.StatMonitor;
-//import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-//import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
-//import org.apache.iotdb.db.query.context.QueryContext;
-//import org.apache.iotdb.db.utils.QueryUtils;
-//import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-//import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-//import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-//import org.apache.iotdb.tsfile.read.common.Path;
-//import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
-//import org.apache.iotdb.tsfile.utils.Pair;
-//import org.apache.iotdb.tsfile.write.record.TSRecord;
-//import org.apache.iotdb.tsfile.write.schema.FileSchema;
-//import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//public class StorageGroupProcessor extends Processor implements IStatistic {
-//
-//  private static final Logger LOGGER = 
LoggerFactory.getLogger(StorageGroupProcessor.class);
-//  private static final String RESTORE_FILE_SUFFIX = ".restore";
-//
-//  TsFileProcessor tsFileProcessor;
-//  OverflowProcessor overflowProcessor;
-//  private FileNodeProcessorStore fileNodeProcessorStore;
-//  String fileNodeProcessorStoreFilePath;
-//
-//  //the version controller is shared by tsfile and overflow processor.
-//  private VersionController versionController;
-//
-//  private FileSchema fileSchema;
-//
-//  Action beforeFlushAction = () -> {};
-//  Action afterFlushAction = () -> {};
-//  Action afterCloseAction = () -> {};
-//
-//  /**
-//   * Construct processor using name space seriesPath
-//   */
-//  public StorageGroupProcessor(String processorName)
-//      throws IOException, BufferWriteProcessorException, 
WriteProcessException, FileNodeProcessorException {
-//    super(processorName);
-//
-//    this.fileSchema = constructFileSchema(processorName);
-//
-//    File restoreFolder = new 
File(IoTDBDescriptor.getInstance().getConfig().getFileNodeDir(),
-//        processorName);
-//    if (!restoreFolder.exists()) {
-//      restoreFolder.mkdirs();
-//      LOGGER.info("The restore directory of the filenode processor {} 
doesn't exist. Create new " +
-//              "directory {}",
-//          getProcessorName(), restoreFolder.getAbsolutePath());
-//    }
-//    versionController = new 
SimpleFileVersionController(restoreFolder.getAbsolutePath());
-//    tsFileProcessor = new TsFileProcessor(processorName, beforeFlushAction, 
afterFlushAction,
-//        afterCloseAction, versionController, fileSchema);
-//    overflowProcessor = new OverflowProcessor(processorName, 
beforeFlushAction, afterFlushAction,
-//        afterCloseAction, versionController, fileSchema);
-//
-//    fileNodeProcessorStoreFilePath = new File(restoreFolder, processorName + 
RESTORE_FILE_SUFFIX)
-//        .getPath();
-//    try {
-//      readStoreFromDisk();
-//    } catch (FileNodeProcessorException | IOException e) {
-//      LOGGER.error(
-//          "The fileNode processor {} encountered an error when recoverying 
restore " +
-//              "information.", processorName);
-//      throw new FileNodeProcessorException(e);
-//    }
-//
-//    // RegistStatService
-//    if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
-//      String statStorageDeltaName =
-//          MonitorConstants.STAT_STORAGE_GROUP_PREFIX + 
MonitorConstants.MONITOR_PATH_SEPARATOR
-//              + MonitorConstants.FILE_NODE_PATH + 
MonitorConstants.MONITOR_PATH_SEPARATOR
-//              + processorName.replaceAll("\\.", "_");
-//      StatMonitor statMonitor = StatMonitor.getInstance();
-//      registerStatMetadata();
-//      statMonitor.registerStatistics(statStorageDeltaName, this);
-//    }
-//  }
-//
-//  public OperationResult insert(InsertPlan insertPlan) throws IOException, 
BufferWriteProcessorException {
-//    OperationResult result = tsFileProcessor.insert(insertPlan);
-//    if (result == OperationResult.WRITE_REJECT_BY_TIME) {
-//      result = overflowProcessor.insert(insertPlan);
-//    }
-//    return result;
-//  }
-//
-//  public void update(UpdatePlan plan) {
-//    overflowProcessor.update(plan);
-//  }
-//
-//  public void delete(String device, String measurementId, long timestamp) 
throws IOException {
-//    tsFileProcessor.delete(device, measurementId, timestamp);
-//  }
-//
-//  /**
-//   * query data.
-//   */
-//  public QueryDataSource query(SingleSeriesExpression seriesExpression, 
QueryContext context)
-//      throws FileNodeManagerException, IOException {
-//    QueryDataSource tsfileData = tsFileProcessor.query(seriesExpression, 
context);
-//    QueryDataSource overflowData = overflowProcessor.query(seriesExpression, 
context)
-//  }
-//
-//
-//  public void addExternalTsFile() {
-//
-//  }
-//
-//  public void addExternalOverflowFile() {
-//
-//  }
-//
-//
-//
-//
-//  @Override
-//  public boolean canBeClosed() {
-//    return false;
-//  }
-//
-//  @Override
-//  public Future<Boolean> flush() throws IOException {
-//    return null;
-//  }
-//
-//  @Override
-//  public void close() throws ProcessorException {
-//
-//  }
-//
-//  @Override
-//  public long memoryUsage() {
-//    return 0;
-//  }
-//
-//
-//
-//  @Override
-//  public Map<String, TSRecord> getAllStatisticsValue() {
-//    return null;
-//  }
-//
-//  @Override
-//  public void registerStatMetadata() {
-//
-//  }
-//
-//  @Override
-//  public List<String> getAllPathForStatistic() {
-//    return null;
-//  }
-//
-//  @Override
-//  public Map<String, AtomicLong> getStatParamsHashMap() {
-//    return null;
-//  }
-//
-//  private FileSchema constructFileSchema(String processorName) throws 
WriteProcessException {
-//    List<MeasurementSchema> columnSchemaList;
-//    columnSchemaList = 
MManager.getInstance().getSchemaForFileName(processorName);
-//
-//    FileSchema schema = new FileSchema();
-//    for (MeasurementSchema measurementSchema : columnSchemaList) {
-//      schema.registerMeasurement(measurementSchema);
-//    }
-//    return schema;
-//
-//  }
-//
-//
-//  private void readStoreFromDisk()
-//      throws FileNodeProcessorException, IOException {
-//    //only be used when recorvery, and at this time, there is no write 
operations.
-//    File restoreFile = new File(fileNodeProcessorStoreFilePath);
-//    if (!restoreFile.exists() || restoreFile.length() == 0) {
-//      fileNodeProcessorStore = new FileNodeProcessorStore(false, new 
HashMap<>(),
-//          new TsFileResource(null, false),
-//          new ArrayList<>(), FileNodeProcessorStatus.NONE, 0);
-//    }
-//    try (FileInputStream inputStream = new 
FileInputStream(fileNodeProcessorStoreFilePath)) {
-//      fileNodeProcessorStore = 
FileNodeProcessorStore.deSerialize(inputStream);
-//    } catch (IOException e) {
-//      LOGGER.error("Failed to deserialize the FileNodeRestoreFile {}, {}",
-//              fileNodeProcessorStoreFilePath, e);
-//      throw new FileNodeProcessorException(e);
-//    }
-//  }
-//}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.sgmanager;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.Processor;
+import org.apache.iotdb.db.engine.bufferwrite.Action;
+import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStatus;
+import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStore;
+import org.apache.iotdb.db.engine.filenode.TsFileResource;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.overflowdata.OverflowProcessor;
+import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
+import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile;
+import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
+import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
+import org.apache.iotdb.db.engine.version.VersionController;
+import org.apache.iotdb.db.exception.BufferWriteProcessorException;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.FileNodeProcessorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.monitor.IStatistic;
+import org.apache.iotdb.db.monitor.MonitorConstants;
+import org.apache.iotdb.db.monitor.StatMonitor;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.schema.FileSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StorageGroupProcessor extends Processor implements IStatistic {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StorageGroupProcessor.class);
+  private static final String RESTORE_FILE_SUFFIX = ".restore";
+
+  TsFileProcessor tsFileProcessor;
+  OverflowProcessor overflowProcessor;
+  private FileNodeProcessorStore fileNodeProcessorStore;
+  String fileNodeProcessorStoreFilePath;
+
+  //the version controller is shared by tsfile and overflow processor.
+  private VersionController versionController;
+
+  private FileSchema fileSchema;
+
+  Action beforeFlushAction = () -> {};
+  Action afterFlushAction = () -> {};
+  Action afterCloseAction = () -> {};
+
+  /**
+   * Construct processor using name space seriesPath
+   */
+  public StorageGroupProcessor(String processorName)
+      throws IOException, BufferWriteProcessorException, 
WriteProcessException, FileNodeProcessorException {
+    super(processorName);
+
+    this.fileSchema = constructFileSchema(processorName);
+
+    File restoreFolder = new 
File(IoTDBDescriptor.getInstance().getConfig().getFileNodeDir(),
+        processorName);
+    if (!restoreFolder.exists()) {
+      restoreFolder.mkdirs();
+      LOGGER.info("The restore directory of the filenode processor {} doesn't 
exist. Create new " +
+              "directory {}",
+          getProcessorName(), restoreFolder.getAbsolutePath());
+    }
+    versionController = new 
SimpleFileVersionController(restoreFolder.getAbsolutePath());
+    tsFileProcessor = new TsFileProcessor(processorName, beforeFlushAction, 
afterFlushAction,
+        afterCloseAction, versionController, fileSchema);
+    overflowProcessor = new OverflowProcessor(processorName, 
beforeFlushAction, afterFlushAction,
+        afterCloseAction, versionController, fileSchema);
+
+    fileNodeProcessorStoreFilePath = new File(restoreFolder, processorName + 
RESTORE_FILE_SUFFIX)
+        .getPath();
+    try {
+      readStoreFromDisk();
+    } catch (FileNodeProcessorException | IOException e) {
+      LOGGER.error(
+          "The fileNode processor {} encountered an error when recoverying 
restore " +
+              "information.", processorName);
+      throw new FileNodeProcessorException(e);
+    }
+
+    // RegistStatService
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
+      String statStorageDeltaName =
+          MonitorConstants.STAT_STORAGE_GROUP_PREFIX + 
MonitorConstants.MONITOR_PATH_SEPARATOR
+              + MonitorConstants.FILE_NODE_PATH + 
MonitorConstants.MONITOR_PATH_SEPARATOR
+              + processorName.replaceAll("\\.", "_");
+      StatMonitor statMonitor = StatMonitor.getInstance();
+      registerStatMetadata();
+      statMonitor.registerStatistics(statStorageDeltaName, this);
+    }
+  }
+
+  public OperationResult insert(InsertPlan insertPlan) throws IOException, 
BufferWriteProcessorException {
+    OperationResult result = tsFileProcessor.insert(insertPlan);
+    if (result == OperationResult.WRITE_REJECT_BY_TIME) {
+      result = overflowProcessor.insert(insertPlan);
+    }
+    return result;
+  }
+
+  public void update(UpdatePlan plan) {
+    overflowProcessor.update(plan);
+  }
+
+  public void delete(String device, String measurementId, long timestamp) 
throws IOException {
+    tsFileProcessor.delete(device, measurementId, timestamp);
+  }
+
+  /**
+   * query data.
+   */
+  public QueryDataSource query(SingleSeriesExpression seriesExpression, 
QueryContext context)
+      throws FileNodeManagerException, IOException {
+    GlobalSortedSeriesDataSource tsfileData = 
tsFileProcessor.query(seriesExpression, context);
+    GlobalSortedSeriesDataSource overflowData = 
overflowProcessor.query(seriesExpression, context);
+    return null;
+  }
+
+
+  public void addExternalTsFile() {
+
+  }
+
+  public void addExternalOverflowFile() {
+
+  }
+
+
+
+
+  @Override
+  public boolean canBeClosed() {
+    tsFileProcessor.canBeClosed();
+    return false;
+  }
+
+  @Override
+  public Future<Boolean> flush() throws IOException {
+    return null;
+  }
+
+  @Override
+  public void close() throws ProcessorException {
+
+  }
+
+  @Override
+  public long memoryUsage() {
+    return 0;
+  }
+
+
+
+  @Override
+  public Map<String, TSRecord> getAllStatisticsValue() {
+    return null;
+  }
+
+  @Override
+  public void registerStatMetadata() {
+
+  }
+
+  @Override
+  public List<String> getAllPathForStatistic() {
+    return null;
+  }
+
+  @Override
+  public Map<String, AtomicLong> getStatParamsHashMap() {
+    return null;
+  }
+
+  private FileSchema constructFileSchema(String processorName) throws 
WriteProcessException {
+    List<MeasurementSchema> columnSchemaList;
+    columnSchemaList = 
MManager.getInstance().getSchemaForFileName(processorName);
+
+    FileSchema schema = new FileSchema();
+    for (MeasurementSchema measurementSchema : columnSchemaList) {
+      schema.registerMeasurement(measurementSchema);
+    }
+    return schema;
+
+  }
+
+
+  private void readStoreFromDisk()
+      throws FileNodeProcessorException, IOException {
+    //only be used when recorvery, and at this time, there is no write 
operations.
+    File restoreFile = new File(fileNodeProcessorStoreFilePath);
+    if (!restoreFile.exists() || restoreFile.length() == 0) {
+      fileNodeProcessorStore = new FileNodeProcessorStore(false, new 
HashMap<>(),
+          new TsFileResource(null, false),
+          new ArrayList<>(), FileNodeProcessorStatus.NONE, 0);
+    }
+    try (FileInputStream inputStream = new 
FileInputStream(fileNodeProcessorStoreFilePath)) {
+      fileNodeProcessorStore = FileNodeProcessorStore.deSerialize(inputStream);
+    } catch (IOException e) {
+      LOGGER.error("Failed to deserialize the FileNodeRestoreFile {}, {}",
+          fileNodeProcessorStoreFilePath, e);
+      throw new FileNodeProcessorException(e);
+    }
+  }
+}

Reply via email to