This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch refactor_overflow
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/refactor_overflow by this push:
new 738a6d6 temporary commit
738a6d6 is described below
commit 738a6d6a6628711cce1f778caf9b7b79cffea45b
Author: xiangdong huang <[email protected]>
AuthorDate: Thu Apr 25 08:18:07 2019 +0800
temporary commit
---
.../db/engine/overflowdata/OverflowProcessor.java | 11 +
.../db/engine/sgmanager/StorageGroupProcessor.java | 494 +++++++++++----------
2 files changed, 259 insertions(+), 246 deletions(-)
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java
index 6f460a6..6c31247 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java
@@ -20,14 +20,24 @@
package org.apache.iotdb.db.engine.overflowdata;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.apache.iotdb.db.conf.directories.Directories;
import org.apache.iotdb.db.engine.bufferwrite.Action;
+import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
+import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
+import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.sgmanager.OperationResult;
import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.BufferWriteProcessorException;
import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.FileSchema;
public class OverflowProcessor extends TsFileProcessor {
@@ -69,4 +79,5 @@ public class OverflowProcessor extends TsFileProcessor {
return null;
}
+
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
index 4cc0d6a..70d2ec0 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
@@ -1,246 +1,248 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements. See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership. The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing,
-// * software distributed under the License is distributed on an
-// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// * KIND, either express or implied. See the License for the
-// * specific language governing permissions and limitations
-// * under the License.
-// */
-//
-//package org.apache.iotdb.db.engine.sgmanager;
-//
-//import java.io.File;
-//import java.io.FileInputStream;
-//import java.io.IOException;
-//import java.util.ArrayList;
-//import java.util.HashMap;
-//import java.util.List;
-//import java.util.Map;
-//import java.util.concurrent.Future;
-//import java.util.concurrent.atomic.AtomicLong;
-//import org.apache.iotdb.db.conf.IoTDBConstant;
-//import org.apache.iotdb.db.conf.IoTDBDescriptor;
-//import org.apache.iotdb.db.engine.Processor;
-//import org.apache.iotdb.db.engine.bufferwrite.Action;
-//import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStatus;
-//import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStore;
-//import org.apache.iotdb.db.engine.filenode.TsFileResource;
-//import org.apache.iotdb.db.engine.modification.Modification;
-//import org.apache.iotdb.db.engine.overflowdata.OverflowProcessor;
-//import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
-//import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
-//import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-//import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-//import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile;
-//import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
-//import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
-//import org.apache.iotdb.db.engine.version.VersionController;
-//import org.apache.iotdb.db.exception.BufferWriteProcessorException;
-//import org.apache.iotdb.db.exception.FileNodeManagerException;
-//import org.apache.iotdb.db.exception.FileNodeProcessorException;
-//import org.apache.iotdb.db.exception.ProcessorException;
-//import org.apache.iotdb.db.metadata.MManager;
-//import org.apache.iotdb.db.monitor.IStatistic;
-//import org.apache.iotdb.db.monitor.MonitorConstants;
-//import org.apache.iotdb.db.monitor.StatMonitor;
-//import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-//import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
-//import org.apache.iotdb.db.query.context.QueryContext;
-//import org.apache.iotdb.db.utils.QueryUtils;
-//import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-//import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-//import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-//import org.apache.iotdb.tsfile.read.common.Path;
-//import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
-//import org.apache.iotdb.tsfile.utils.Pair;
-//import org.apache.iotdb.tsfile.write.record.TSRecord;
-//import org.apache.iotdb.tsfile.write.schema.FileSchema;
-//import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//public class StorageGroupProcessor extends Processor implements IStatistic {
-//
-// private static final Logger LOGGER =
LoggerFactory.getLogger(StorageGroupProcessor.class);
-// private static final String RESTORE_FILE_SUFFIX = ".restore";
-//
-// TsFileProcessor tsFileProcessor;
-// OverflowProcessor overflowProcessor;
-// private FileNodeProcessorStore fileNodeProcessorStore;
-// String fileNodeProcessorStoreFilePath;
-//
-// //the version controller is shared by tsfile and overflow processor.
-// private VersionController versionController;
-//
-// private FileSchema fileSchema;
-//
-// Action beforeFlushAction = () -> {};
-// Action afterFlushAction = () -> {};
-// Action afterCloseAction = () -> {};
-//
-// /**
-// * Construct processor using name space seriesPath
-// */
-// public StorageGroupProcessor(String processorName)
-// throws IOException, BufferWriteProcessorException,
WriteProcessException, FileNodeProcessorException {
-// super(processorName);
-//
-// this.fileSchema = constructFileSchema(processorName);
-//
-// File restoreFolder = new
File(IoTDBDescriptor.getInstance().getConfig().getFileNodeDir(),
-// processorName);
-// if (!restoreFolder.exists()) {
-// restoreFolder.mkdirs();
-// LOGGER.info("The restore directory of the filenode processor {}
doesn't exist. Create new " +
-// "directory {}",
-// getProcessorName(), restoreFolder.getAbsolutePath());
-// }
-// versionController = new
SimpleFileVersionController(restoreFolder.getAbsolutePath());
-// tsFileProcessor = new TsFileProcessor(processorName, beforeFlushAction,
afterFlushAction,
-// afterCloseAction, versionController, fileSchema);
-// overflowProcessor = new OverflowProcessor(processorName,
beforeFlushAction, afterFlushAction,
-// afterCloseAction, versionController, fileSchema);
-//
-// fileNodeProcessorStoreFilePath = new File(restoreFolder, processorName +
RESTORE_FILE_SUFFIX)
-// .getPath();
-// try {
-// readStoreFromDisk();
-// } catch (FileNodeProcessorException | IOException e) {
-// LOGGER.error(
-// "The fileNode processor {} encountered an error when recoverying
restore " +
-// "information.", processorName);
-// throw new FileNodeProcessorException(e);
-// }
-//
-// // RegistStatService
-// if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
-// String statStorageDeltaName =
-// MonitorConstants.STAT_STORAGE_GROUP_PREFIX +
MonitorConstants.MONITOR_PATH_SEPARATOR
-// + MonitorConstants.FILE_NODE_PATH +
MonitorConstants.MONITOR_PATH_SEPARATOR
-// + processorName.replaceAll("\\.", "_");
-// StatMonitor statMonitor = StatMonitor.getInstance();
-// registerStatMetadata();
-// statMonitor.registerStatistics(statStorageDeltaName, this);
-// }
-// }
-//
-// public OperationResult insert(InsertPlan insertPlan) throws IOException,
BufferWriteProcessorException {
-// OperationResult result = tsFileProcessor.insert(insertPlan);
-// if (result == OperationResult.WRITE_REJECT_BY_TIME) {
-// result = overflowProcessor.insert(insertPlan);
-// }
-// return result;
-// }
-//
-// public void update(UpdatePlan plan) {
-// overflowProcessor.update(plan);
-// }
-//
-// public void delete(String device, String measurementId, long timestamp)
throws IOException {
-// tsFileProcessor.delete(device, measurementId, timestamp);
-// }
-//
-// /**
-// * query data.
-// */
-// public QueryDataSource query(SingleSeriesExpression seriesExpression,
QueryContext context)
-// throws FileNodeManagerException, IOException {
-// QueryDataSource tsfileData = tsFileProcessor.query(seriesExpression,
context);
-// QueryDataSource overflowData = overflowProcessor.query(seriesExpression,
context)
-// }
-//
-//
-// public void addExternalTsFile() {
-//
-// }
-//
-// public void addExternalOverflowFile() {
-//
-// }
-//
-//
-//
-//
-// @Override
-// public boolean canBeClosed() {
-// return false;
-// }
-//
-// @Override
-// public Future<Boolean> flush() throws IOException {
-// return null;
-// }
-//
-// @Override
-// public void close() throws ProcessorException {
-//
-// }
-//
-// @Override
-// public long memoryUsage() {
-// return 0;
-// }
-//
-//
-//
-// @Override
-// public Map<String, TSRecord> getAllStatisticsValue() {
-// return null;
-// }
-//
-// @Override
-// public void registerStatMetadata() {
-//
-// }
-//
-// @Override
-// public List<String> getAllPathForStatistic() {
-// return null;
-// }
-//
-// @Override
-// public Map<String, AtomicLong> getStatParamsHashMap() {
-// return null;
-// }
-//
-// private FileSchema constructFileSchema(String processorName) throws
WriteProcessException {
-// List<MeasurementSchema> columnSchemaList;
-// columnSchemaList =
MManager.getInstance().getSchemaForFileName(processorName);
-//
-// FileSchema schema = new FileSchema();
-// for (MeasurementSchema measurementSchema : columnSchemaList) {
-// schema.registerMeasurement(measurementSchema);
-// }
-// return schema;
-//
-// }
-//
-//
-// private void readStoreFromDisk()
-// throws FileNodeProcessorException, IOException {
-// //only be used when recorvery, and at this time, there is no write
operations.
-// File restoreFile = new File(fileNodeProcessorStoreFilePath);
-// if (!restoreFile.exists() || restoreFile.length() == 0) {
-// fileNodeProcessorStore = new FileNodeProcessorStore(false, new
HashMap<>(),
-// new TsFileResource(null, false),
-// new ArrayList<>(), FileNodeProcessorStatus.NONE, 0);
-// }
-// try (FileInputStream inputStream = new
FileInputStream(fileNodeProcessorStoreFilePath)) {
-// fileNodeProcessorStore =
FileNodeProcessorStore.deSerialize(inputStream);
-// } catch (IOException e) {
-// LOGGER.error("Failed to deserialize the FileNodeRestoreFile {}, {}",
-// fileNodeProcessorStoreFilePath, e);
-// throw new FileNodeProcessorException(e);
-// }
-// }
-//}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.sgmanager;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.Processor;
+import org.apache.iotdb.db.engine.bufferwrite.Action;
+import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStatus;
+import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStore;
+import org.apache.iotdb.db.engine.filenode.TsFileResource;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.overflowdata.OverflowProcessor;
+import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
+import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile;
+import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessor;
+import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
+import org.apache.iotdb.db.engine.version.VersionController;
+import org.apache.iotdb.db.exception.BufferWriteProcessorException;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.FileNodeProcessorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.monitor.IStatistic;
+import org.apache.iotdb.db.monitor.MonitorConstants;
+import org.apache.iotdb.db.monitor.StatMonitor;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.schema.FileSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StorageGroupProcessor extends Processor implements IStatistic {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StorageGroupProcessor.class);
+ private static final String RESTORE_FILE_SUFFIX = ".restore";
+
+ TsFileProcessor tsFileProcessor;
+ OverflowProcessor overflowProcessor;
+ private FileNodeProcessorStore fileNodeProcessorStore;
+ String fileNodeProcessorStoreFilePath;
+
+ //the version controller is shared by tsfile and overflow processor.
+ private VersionController versionController;
+
+ private FileSchema fileSchema;
+
+ Action beforeFlushAction = () -> {};
+ Action afterFlushAction = () -> {};
+ Action afterCloseAction = () -> {};
+
+ /**
+ * Construct processor using name space seriesPath
+ */
+ public StorageGroupProcessor(String processorName)
+ throws IOException, BufferWriteProcessorException,
WriteProcessException, FileNodeProcessorException {
+ super(processorName);
+
+ this.fileSchema = constructFileSchema(processorName);
+
+ File restoreFolder = new
File(IoTDBDescriptor.getInstance().getConfig().getFileNodeDir(),
+ processorName);
+ if (!restoreFolder.exists()) {
+ restoreFolder.mkdirs();
+ LOGGER.info("The restore directory of the filenode processor {} doesn't
exist. Create new " +
+ "directory {}",
+ getProcessorName(), restoreFolder.getAbsolutePath());
+ }
+ versionController = new
SimpleFileVersionController(restoreFolder.getAbsolutePath());
+ tsFileProcessor = new TsFileProcessor(processorName, beforeFlushAction,
afterFlushAction,
+ afterCloseAction, versionController, fileSchema);
+ overflowProcessor = new OverflowProcessor(processorName,
beforeFlushAction, afterFlushAction,
+ afterCloseAction, versionController, fileSchema);
+
+ fileNodeProcessorStoreFilePath = new File(restoreFolder, processorName +
RESTORE_FILE_SUFFIX)
+ .getPath();
+ try {
+ readStoreFromDisk();
+ } catch (FileNodeProcessorException | IOException e) {
+ LOGGER.error(
+ "The fileNode processor {} encountered an error when recoverying
restore " +
+ "information.", processorName);
+ throw new FileNodeProcessorException(e);
+ }
+
+ // RegistStatService
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
+ String statStorageDeltaName =
+ MonitorConstants.STAT_STORAGE_GROUP_PREFIX +
MonitorConstants.MONITOR_PATH_SEPARATOR
+ + MonitorConstants.FILE_NODE_PATH +
MonitorConstants.MONITOR_PATH_SEPARATOR
+ + processorName.replaceAll("\\.", "_");
+ StatMonitor statMonitor = StatMonitor.getInstance();
+ registerStatMetadata();
+ statMonitor.registerStatistics(statStorageDeltaName, this);
+ }
+ }
+
+ public OperationResult insert(InsertPlan insertPlan) throws IOException,
BufferWriteProcessorException {
+ OperationResult result = tsFileProcessor.insert(insertPlan);
+ if (result == OperationResult.WRITE_REJECT_BY_TIME) {
+ result = overflowProcessor.insert(insertPlan);
+ }
+ return result;
+ }
+
+ public void update(UpdatePlan plan) {
+ overflowProcessor.update(plan);
+ }
+
+ public void delete(String device, String measurementId, long timestamp)
throws IOException {
+ tsFileProcessor.delete(device, measurementId, timestamp);
+ }
+
+ /**
+ * query data.
+ */
+ public QueryDataSource query(SingleSeriesExpression seriesExpression,
QueryContext context)
+ throws FileNodeManagerException, IOException {
+ GlobalSortedSeriesDataSource tsfileData =
tsFileProcessor.query(seriesExpression, context);
+ GlobalSortedSeriesDataSource overflowData =
overflowProcessor.query(seriesExpression, context);
+ return null;
+ }
+
+
+ public void addExternalTsFile() {
+
+ }
+
+ public void addExternalOverflowFile() {
+
+ }
+
+
+
+
+ @Override
+ public boolean canBeClosed() {
+ tsFileProcessor.canBeClosed();
+ return false;
+ }
+
+ @Override
+ public Future<Boolean> flush() throws IOException {
+ return null;
+ }
+
+ @Override
+ public void close() throws ProcessorException {
+
+ }
+
+ @Override
+ public long memoryUsage() {
+ return 0;
+ }
+
+
+
+ @Override
+ public Map<String, TSRecord> getAllStatisticsValue() {
+ return null;
+ }
+
+ @Override
+ public void registerStatMetadata() {
+
+ }
+
+ @Override
+ public List<String> getAllPathForStatistic() {
+ return null;
+ }
+
+ @Override
+ public Map<String, AtomicLong> getStatParamsHashMap() {
+ return null;
+ }
+
+ private FileSchema constructFileSchema(String processorName) throws
WriteProcessException {
+ List<MeasurementSchema> columnSchemaList;
+ columnSchemaList =
MManager.getInstance().getSchemaForFileName(processorName);
+
+ FileSchema schema = new FileSchema();
+ for (MeasurementSchema measurementSchema : columnSchemaList) {
+ schema.registerMeasurement(measurementSchema);
+ }
+ return schema;
+
+ }
+
+
+ private void readStoreFromDisk()
+ throws FileNodeProcessorException, IOException {
+ //only be used when recorvery, and at this time, there is no write
operations.
+ File restoreFile = new File(fileNodeProcessorStoreFilePath);
+ if (!restoreFile.exists() || restoreFile.length() == 0) {
+ fileNodeProcessorStore = new FileNodeProcessorStore(false, new
HashMap<>(),
+ new TsFileResource(null, false),
+ new ArrayList<>(), FileNodeProcessorStatus.NONE, 0);
+ }
+ try (FileInputStream inputStream = new
FileInputStream(fileNodeProcessorStoreFilePath)) {
+ fileNodeProcessorStore = FileNodeProcessorStore.deSerialize(inputStream);
+ } catch (IOException e) {
+ LOGGER.error("Failed to deserialize the FileNodeRestoreFile {}, {}",
+ fileNodeProcessorStoreFilePath, e);
+ throw new FileNodeProcessorException(e);
+ }
+ }
+}