This is an automated email from the ASF dual-hosted git repository.
mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new d9764ab1dc [NO ISSUE][OTH] Additional external indexing clean up
d9764ab1dc is described below
commit d9764ab1dc0d3f02e29aef257f5506c7e0afb297
Author: Murtadha Hubail <[email protected]>
AuthorDate: Sat Mar 25 20:03:10 2023 +0300
[NO ISSUE][OTH] Additional external indexing clean up
- user model changes: yes
- storage format changes: no
- interface changes: yes
Details:
- Remove REFRESH statement.
- More external indexing runtime removal.
Change-Id: Ide588c3933979edae763b810dfa6f8a34116945f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17449
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Murtadha Al Hubail <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
---
.../asterix/app/translator/QueryTranslator.java | 88 -------------
.../adapter/factory/GenericAdapterFactory.java | 22 +---
.../external/api/IExternalDataSourceFactory.java | 9 --
.../asterix/external/api/IExternalIndexer.java | 56 --------
.../external/api/IIndexibleExternalDataSource.java | 34 -----
.../external/api/IIndexingAdapterFactory.java | 27 ----
.../asterix/external/api/IIndexingDatasource.java | 50 --------
.../dataflow/IndexingDataFlowController.java | 47 -------
.../external/input/HDFSDataSourceFactory.java | 69 +++-------
.../record/reader/IndexingStreamRecordReader.java | 101 ---------------
.../AbstractExternalInputStreamFactory.java | 5 -
.../input/record/reader/hdfs/HDFSRecordReader.java | 53 +-------
.../record/reader/rss/RSSRecordReaderFactory.java | 5 -
.../reader/twitter/TwitterRecordReaderFactory.java | 5 -
.../external/input/stream/HDFSInputStream.java | 64 +---------
.../stream/factory/LocalFSInputStreamFactory.java | 5 -
.../factory/SocketServerInputStreamFactory.java | 5 -
.../factory/TwitterFirehoseStreamFactory.java | 5 -
.../provider/DataflowControllerProvider.java | 9 +-
.../apache/asterix/lang/common/base/Statement.java | 1 -
.../statement/RefreshExternalDatasetStatement.java | 64 ----------
.../asterix-lang-sqlpp/src/main/javacc/SQLPP.jj | 19 ---
.../declared/BTreeResourceFactoryProvider.java | 6 +-
.../metadata/declared/MetadataProvider.java | 27 +---
.../metadata/lock/ExternalDatasetsRegistry.java | 142 ---------------------
.../apache/asterix/metadata/utils/DatasetUtil.java | 16 ---
.../utils/ExternalDatasetAccessManager.java | 122 ------------------
.../apache/asterix/metadata/utils/IndexUtil.java | 3 -
.../utils/SecondaryIndexOperationsHelper.java | 6 -
29 files changed, 30 insertions(+), 1035 deletions(-)
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 399b7b4e1a..15a8238a6b 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -94,7 +94,6 @@ import org.apache.asterix.common.utils.JobUtils.ProgressState;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.external.dataset.adapter.AdapterIdentifier;
-import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -187,7 +186,6 @@ import org.apache.asterix.metadata.entities.Synonym;
import org.apache.asterix.metadata.entities.ViewDetails;
import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
import org.apache.asterix.metadata.functions.ExternalFunctionCompilerUtil;
-import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry;
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.metadata.utils.IndexUtil;
import org.apache.asterix.metadata.utils.KeyFieldTypeUtil;
@@ -499,9 +497,6 @@ public class QueryTranslator extends AbstractLangTranslator
implements IStatemen
case COMPACT:
handleCompactStatement(metadataProvider, stmt, hcc);
break;
- case EXTERNAL_DATASET_REFRESH:
-
handleExternalDatasetRefreshStatement(metadataProvider, stmt, hcc);
- break;
case FUNCTION_DECL:
handleDeclareFunctionStatement(metadataProvider, stmt);
break;
@@ -1549,9 +1544,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
Index index, EnumSet<JobFlag> jobFlags, SourceLocation sourceLoc)
throws Exception {
ProgressState progress = ProgressState.NO_PROGRESS;
boolean bActiveTxn = true;
- Index filesIndex = null;
- boolean firstExternalDatasetIndex = false;
- boolean datasetLocked = false;
MetadataTransactionContext mdTxnCtx =
metadataProvider.getMetadataTxnContext();
JobSpecification spec;
try {
@@ -1559,7 +1551,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
if (ds.getDatasetType() == DatasetType.INTERNAL) {
validateDatasetState(metadataProvider, ds, sourceLoc);
} else if (ds.getDatasetType() == DatasetType.EXTERNAL) {
- // External dataset
throw new CompilationException(ErrorCode.COMPILATION_ERROR,
sourceLoc, dataset() + " using "
+ ((ExternalDatasetDetails)
ds.getDatasetDetails()).getAdapter() + " adapter can't be indexed");
}
@@ -1666,17 +1657,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
index.getDatasetName(), index.getIndexName());
index.setPendingOp(MetadataUtil.PENDING_NO_OP);
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(),
index);
- // add another new files index with PendingNoOp after deleting the
index with
- // PendingAddOp
- if (firstExternalDatasetIndex) {
-
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
index.getDataverseName(),
- index.getDatasetName(), filesIndex.getIndexName());
- filesIndex.setPendingOp(MetadataUtil.PENDING_NO_OP);
-
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(),
filesIndex);
- // update transaction timestamp
- ((ExternalDatasetDetails)
ds.getDatasetDetails()).setRefreshTimestamp(new Date());
- MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
- }
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
if (bActiveTxn) {
@@ -1701,38 +1681,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
}
}
- if (firstExternalDatasetIndex) {
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- try {
- // Drop External Files from metadata
-
MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e2) {
- e.addSuppressed(e2);
- abort(e, e2, mdTxnCtx);
- throw new IllegalStateException(
- "System is inconsistent state: pending files
for(" + index.getDataverseName() + "."
- + index.getDatasetName() + ") couldn't
be removed from the metadata",
- e);
- }
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- try {
- // Drop the files index from metadata
-
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
- index.getDataverseName(),
index.getDatasetName(),
-
IndexingConstants.getFilesIndexName(index.getDatasetName()));
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e2) {
- e.addSuppressed(e2);
- abort(e, e2, mdTxnCtx);
- throw new IllegalStateException("System is
inconsistent state: pending index("
- + index.getDataverseName() + "." +
index.getDatasetName() + "."
- +
IndexingConstants.getFilesIndexName(index.getDatasetName())
- + ") couldn't be removed from the metadata",
e);
- }
- }
// remove the record from the metadata.
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1749,10 +1697,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
}
}
throw e;
- } finally {
- if (datasetLocked) {
- ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds,
firstExternalDatasetIndex);
- }
}
}
@@ -1843,7 +1787,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
doDropDataverse(stmtDropDataverse, metadataProvider, hcc,
requestParameters);
} finally {
metadataProvider.getLocks().unlock();
-
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
}
}
@@ -1856,7 +1799,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
List<FeedEventsListener> feedsToStop = new ArrayList<>();
- List<Dataset> externalDatasetsToDeregister = new ArrayList<>();
List<JobSpecification> jobsToExecute = new ArrayList<>();
try {
Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx,
dataverseName);
@@ -1906,13 +1848,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
}
break;
case EXTERNAL:
- indexes =
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
datasetName);
- for (Index index : indexes) {
- jobsToExecute
-
.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, dataset,
sourceLoc));
- }
- externalDatasetsToDeregister.add(dataset);
- break;
case VIEW:
break;
}
@@ -1939,10 +1874,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
bActiveTxn = false;
progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
- for (Dataset externalDataset : externalDatasetsToDeregister) {
-
ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(externalDataset);
- }
-
for (FeedEventsListener feedListener : feedsToStop) {
if (feedListener.getState() != ActivityState.STOPPED) {
feedListener.stop(metadataProvider);
@@ -2045,7 +1976,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
requestParameters, true, sourceLoc);
} finally {
metadataProvider.getLocks().unlock();
-
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
}
}
@@ -2154,7 +2084,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
doDropIndex(metadataProvider, stmtIndexDrop, dataverseName,
datasetName, hcc, requestParameters);
} finally {
metadataProvider.getLocks().unlock();
-
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
}
}
@@ -2165,8 +2094,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
String indexName = stmtIndexDrop.getIndexName().getValue();
ProgressState progress = ProgressState.NO_PROGRESS;
List<JobSpecification> jobsToExecute = new ArrayList<>();
- // For external index
- boolean dropFilesIndex = false;
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
@@ -2208,8 +2135,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
// #. finally, delete the existing index
MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName,
datasetName, indexName);
- } else {
- // External dataset
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
return true;
@@ -2236,10 +2161,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
try {
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
dataverseName,
datasetName, indexName);
- if (dropFilesIndex) {
-
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
dataverseName,
- datasetName,
IndexingConstants.getFilesIndexName(datasetName));
- }
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e2) {
e.addSuppressed(e2);
@@ -2648,7 +2569,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
doDropView(metadataProvider, stmtDrop, dataverseName, viewName);
} finally {
metadataProvider.getLocks().unlock();
-
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
}
}
@@ -4479,7 +4399,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
throw e;
} finally {
metadataProvider.getLocks().unlock();
-
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
}
}
@@ -4520,8 +4439,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
@Override
public void unlock() {
metadataProvider.getLocks().unlock();
- // release external datasets' locks acquired during
compilation of the query
-
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
compilationLock.readLock().unlock();
}
};
@@ -4745,11 +4662,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
}
}
- protected void handleExternalDatasetRefreshStatement(MetadataProvider
metadataProvider, Statement stmt,
- IHyracksClientConnection hcc) throws Exception {
- // no op
- }
-
@Override
public DataverseName getActiveDataverseName(DataverseName dataverseName) {
return dataverseName != null ? dataverseName :
activeDataverse.getDataverseName();
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 8ec5af037e..9ca87b873f 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -19,7 +19,6 @@
package org.apache.asterix.external.adapter.factory;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import org.apache.asterix.common.api.INcApplicationContext;
@@ -30,13 +29,10 @@ import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.external.api.IDataFlowController;
import org.apache.asterix.external.api.IDataParserFactory;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
-import org.apache.asterix.external.api.IIndexibleExternalDataSource;
-import org.apache.asterix.external.api.IIndexingAdapterFactory;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.dataset.adapter.FeedAdapter;
import org.apache.asterix.external.dataset.adapter.GenericAdapter;
-import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.provider.DataflowControllerProvider;
import org.apache.asterix.external.provider.DatasourceFactoryProvider;
import org.apache.asterix.external.provider.ParserFactoryProvider;
@@ -59,7 +55,7 @@ import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-public class GenericAdapterFactory implements IIndexingAdapterFactory,
ITypedAdapterFactory {
+public class GenericAdapterFactory implements ITypedAdapterFactory {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = LogManager.getLogger();
@@ -67,19 +63,11 @@ public class GenericAdapterFactory implements
IIndexingAdapterFactory, ITypedAda
private IDataParserFactory dataParserFactory;
private ARecordType recordType;
private Map<String, String> configuration;
- private List<ExternalFile> files;
- private boolean indexingOp;
private boolean isFeed;
private FileSplit[] feedLogFileSplits;
private ARecordType metaType;
private transient FeedLogManager feedLogManager;
- @Override
- public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
- this.files = files;
- this.indexingOp = indexingOp;
- }
-
@Override
public String getAlias() {
return ExternalDataConstants.ALIAS_GENERIC_ADAPTER;
@@ -111,7 +99,7 @@ public class GenericAdapterFactory implements
IIndexingAdapterFactory, ITypedAda
feedLogManager.touch();
}
IDataFlowController controller =
DataflowControllerProvider.getDataflowController(recordType, ctx, partition,
- dataSourceFactory, dataParserFactory, configuration,
indexingOp, isFeed, feedLogManager);
+ dataSourceFactory, dataParserFactory, configuration, isFeed,
feedLogManager);
if (isFeed) {
return new FeedAdapter((AbstractFeedDataFlowController)
controller);
} else {
@@ -124,9 +112,6 @@ public class GenericAdapterFactory implements
IIndexingAdapterFactory, ITypedAda
if (dataSourceFactory == null) {
dataSourceFactory = createExternalDataSourceFactory(configuration);
// create and configure parser factory
- if (dataSourceFactory.isIndexible() && (files != null)) {
- ((IIndexibleExternalDataSource)
dataSourceFactory).setSnapshot(files, indexingOp);
- }
dataSourceFactory.configure(serviceContext, configuration,
warningCollector);
}
if (dataParserFactory == null) {
@@ -145,9 +130,6 @@ public class GenericAdapterFactory implements
IIndexingAdapterFactory, ITypedAda
ICcApplicationContext appCtx = (ICcApplicationContext)
serviceContext.getApplicationContext();
ExternalDataUtils.validateDataSourceParameters(configuration);
dataSourceFactory = createExternalDataSourceFactory(configuration);
- if (dataSourceFactory.isIndexible() && (files != null)) {
- ((IIndexibleExternalDataSource)
dataSourceFactory).setSnapshot(files, indexingOp);
- }
dataSourceFactory.configure(serviceContext, configuration,
warningCollector);
ExternalDataUtils.validateDataParserParameters(configuration);
dataParserFactory = createDataParserFactory(configuration);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index 17cbff446e..e5c4b3fcc4 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -67,15 +67,6 @@ public interface IExternalDataSourceFactory extends
Serializable {
void configure(IServiceContext ctx, Map<String, String> configuration,
IWarningCollector warningCollector)
throws AlgebricksException, HyracksDataException;
- /**
- * Specify whether the external data source can be indexed
- *
- * @return
- */
- default boolean isIndexible() {
- return false;
- }
-
/**
* returns the passed partition constraints if not null, otherwise returns
round robin absolute partition
* constraints that matches the count.
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalIndexer.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalIndexer.java
deleted file mode 100644
index c261ae3485..0000000000
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalIndexer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.api;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-
-/**
- * This Interface represents the component responsible for adding record IDs
to tuples when indexing external data
- */
-public interface IExternalIndexer extends Serializable {
-
- /**
- * This method is called by an indexible datasource when the external
source reader have been updated.
- * this gives a chance for the indexer to update its reader specific
values (i,e. file name)
- *
- * @param reader
- * the new reader
- * @throws Exception
- */
- public void reset(IIndexingDatasource reader) throws IOException;
-
- /**
- * This method is called by the dataflow controller with each tuple. the
indexer is expected to append record ids to the tuple.
- *
- * @param tb
- * @throws Exception
- */
- public void index(ArrayTupleBuilder tb) throws IOException;
-
- /**
- * This method returns the number of fields in the record id. It is used
by tuple appender at the initialization step.
- *
- * @return
- * @throws Exception
- */
- public int getNumberOfFields() throws IOException;
-}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexibleExternalDataSource.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexibleExternalDataSource.java
deleted file mode 100644
index accd730d07..0000000000
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexibleExternalDataSource.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.api;
-
-import java.util.List;
-
-import org.apache.asterix.external.indexing.ExternalFile;
-
-public interface IIndexibleExternalDataSource extends
IExternalDataSourceFactory {
- public void setSnapshot(List<ExternalFile> files, boolean indexingOp);
-
- /**
- * Specify whether the external data source is configured for indexing
- *
- * @return
- */
- public boolean isIndexingOp();
-}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
deleted file mode 100644
index 8d420465ef..0000000000
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.api;
-
-import java.util.List;
-
-import org.apache.asterix.external.indexing.ExternalFile;
-
-public interface IIndexingAdapterFactory extends ITypedAdapterFactory {
- public void setSnapshot(List<ExternalFile> files, boolean indexingOp);
-}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingDatasource.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingDatasource.java
deleted file mode 100644
index 5381ef7d2b..0000000000
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingDatasource.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.api;
-
-import java.util.List;
-
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.RecordReader;
-
-/**
- * An interface for external data sources which support indexing
- */
-public interface IIndexingDatasource {
- /**
- * @return an external indexer that is used to write RID fields for each
record
- */
- public IExternalIndexer getIndexer();
-
- /**
- * @return a list of external files being accessed
- */
- public List<ExternalFile> getSnapshot();
-
- /**
- * @return the index of the currently being read file
- */
- public int getCurrentSplitIndex();
-
- /**
- * @return an HDFS record reader that is used to get the current position
in the file
- */
- public RecordReader<?, ? extends Writable> getReader();
-}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
deleted file mode 100644
index b956295539..0000000000
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataflow;
-
-import java.io.IOException;
-
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IRecordDataParser;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-
-public class IndexingDataFlowController<T> extends RecordDataFlowController<T>
{
- private final IExternalIndexer indexer;
-
- public IndexingDataFlowController(IHyracksTaskContext ctx,
IRecordDataParser<T> dataParser,
- IRecordReader<? extends T> recordReader, IExternalIndexer indexer)
throws IOException {
- super(ctx, dataParser, recordReader, 1 + indexer.getNumberOfFields());
- this.indexer = indexer;
- }
-
- @Override
- protected void appendOtherTupleFields(ArrayTupleBuilder tb) throws
HyracksDataException {
- try {
- indexer.index(tb);
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
- }
-}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index 596edb2801..f22d12887f 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -32,12 +32,9 @@ import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IIndexibleExternalDataSource;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.external.indexing.ExternalFile;
-import
org.apache.asterix.external.input.record.reader.IndexingStreamRecordReader;
import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.ParquetFileRecordReader;
import
org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
@@ -62,7 +59,7 @@ import org.apache.hyracks.hdfs.dataflow.ConfFactory;
import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory;
import org.apache.hyracks.hdfs.scheduler.Scheduler;
-public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>,
IIndexibleExternalDataSource {
+public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>,
IExternalDataSourceFactory {
private static final long serialVersionUID = 1L;
private static final List<String> recordReaderNames =
Collections.singletonList("hdfs");
@@ -77,7 +74,6 @@ public class HDFSDataSourceFactory implements
IRecordReaderFactory<Object>, IInd
protected static Scheduler hdfsScheduler;
protected static Boolean initialized = false;
protected static Object initLock = new Object();
- protected List<ExternalFile> files;
protected Map<String, String> configuration;
protected Class<?> recordClass;
protected boolean indexingOp = false;
@@ -107,19 +103,14 @@ public class HDFSDataSourceFactory implements
IRecordReaderFactory<Object>, IInd
confFactory = new ConfFactory(conf);
clusterLocations = getPartitionConstraint();
int numPartitions = clusterLocations.getLocations().length;
- // if files list was set, we restrict the splits to the list
- InputSplit[] inputSplits;
- if (files == null) {
- inputSplits = getInputSplits(conf, numPartitions);
- } else {
- inputSplits = HDFSUtils.getSplits(conf, files);
- }
- readSchedule = hdfsScheduler.getLocationConstraints(inputSplits);
- inputSplitsFactory = new InputSplitsFactory(inputSplits);
+ InputSplit[] configInputSplits = getInputSplits(conf,
numPartitions);
+ readSchedule =
hdfsScheduler.getLocationConstraints(configInputSplits);
+ inputSplitsFactory = new InputSplitsFactory(configInputSplits);
read = new boolean[readSchedule.length];
Arrays.fill(read, false);
if (formatString == null ||
formatString.equals(ExternalDataConstants.FORMAT_HDFS_WRITABLE)) {
- RecordReader<?, ?> reader =
conf.getInputFormat().getRecordReader(inputSplits[0], conf, Reporter.NULL);
+ RecordReader<?, ?> reader =
+
conf.getInputFormat().getRecordReader(configInputSplits[0], conf,
Reporter.NULL);
this.recordClass = reader.createValue().getClass();
reader.close();
} else if
(formatString.equals(ExternalDataConstants.FORMAT_PARQUET)) {
@@ -152,24 +143,15 @@ public class HDFSDataSourceFactory implements
IRecordReaderFactory<Object>, IInd
return conf.getInputFormat().getSplits(conf, numPartitions);
}
- // Used to tell the factory to restrict the splits to the intersection
between this list a
- // actual files on hde
- @Override
- public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
- this.files = files;
- this.indexingOp = indexingOp;
- }
-
/*
* The method below was modified to take care of the following
* 1. when target files are not null, it generates a file aware input
stream that validate
* against the files
* 2. if the data is binary, it returns a generic reader */
- public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int
partition, IExternalIndexer indexer)
- throws HyracksDataException {
+ public AsterixInputStream createInputStream(IHyracksTaskContext ctx)
throws HyracksDataException {
try {
restoreConfig(ctx);
- return new HDFSInputStream(read, inputSplits, readSchedule,
nodeName, conf, configuration, files, indexer);
+ return new HDFSInputStream(read, inputSplits, readSchedule,
nodeName, conf, configuration);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
@@ -213,10 +195,6 @@ public class HDFSDataSourceFactory implements
IRecordReaderFactory<Object>, IInd
}
}
- public JobConf getJobConf() throws HyracksDataException {
- return confFactory.getConf();
- }
-
@Override
public DataSourceType getDataSourceType() {
return ExternalDataUtils.getDataSourceType(configuration);
@@ -235,15 +213,10 @@ public class HDFSDataSourceFactory implements
IRecordReaderFactory<Object>, IInd
public IRecordReader<? extends Object>
createRecordReader(IHyracksTaskContext ctx, int partition)
throws HyracksDataException {
try {
- IExternalIndexer indexer = null;
if (recordReaderClazz != null) {
StreamRecordReader streamReader = (StreamRecordReader)
recordReaderClazz.getConstructor().newInstance();
- streamReader.configure(ctx, createInputStream(ctx, partition,
indexer), configuration);
- if (indexer != null) {
- return new IndexingStreamRecordReader(streamReader,
indexer);
- } else {
- return streamReader;
- }
+ streamReader.configure(ctx, createInputStream(ctx),
configuration);
+ return streamReader;
}
restoreConfig(ctx);
JobConf readerConf = conf;
@@ -257,8 +230,8 @@ public class HDFSDataSourceFactory implements
IRecordReaderFactory<Object>, IInd
*/
readerConf = confFactory.getConf();
}
- return createRecordReader(configuration, read, inputSplits,
readSchedule, nodeName, readerConf, files,
- indexer, ctx.getWarningCollector());
+ return createRecordReader(configuration, read, inputSplits,
readSchedule, nodeName, readerConf,
+ ctx.getWarningCollector());
} catch (Exception e) {
throw HyracksDataException.create(e);
}
@@ -269,29 +242,19 @@ public class HDFSDataSourceFactory implements
IRecordReaderFactory<Object>, IInd
return recordClass;
}
- @Override
- public boolean isIndexible() {
- return true;
- }
-
- @Override
- public boolean isIndexingOp() {
- return ((files != null) && indexingOp);
- }
-
@Override
public List<String> getRecordReaderNames() {
return recordReaderNames;
}
private static IRecordReader<? extends Object>
createRecordReader(Map<String, String> configuration, boolean[] read,
- InputSplit[] inputSplits, String[] readSchedule, String nodeName,
JobConf conf, List<ExternalFile> files,
- IExternalIndexer indexer, IWarningCollector warningCollector)
throws IOException {
+ InputSplit[] inputSplits, String[] readSchedule, String nodeName,
JobConf conf,
+ IWarningCollector warningCollector) {
if (configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT.trim())
.equals(ExternalDataConstants.INPUT_FORMAT_PARQUET)) {
return new ParquetFileRecordReader<>(read, inputSplits,
readSchedule, nodeName, conf, warningCollector);
} else {
- return new HDFSRecordReader<>(read, inputSplits, readSchedule,
nodeName, conf, files, indexer);
+ return new HDFSRecordReader<>(read, inputSplits, readSchedule,
nodeName, conf);
}
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java
deleted file mode 100644
index 6eee8920b9..0000000000
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IIndexingDatasource;
-import org.apache.asterix.external.api.IRawRecord;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.indexing.ExternalFile;
-import
org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
-import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class IndexingStreamRecordReader implements IRecordReader<char[]>,
IIndexingDatasource {
-
- private StreamRecordReader reader;
- private IExternalIndexer indexer;
-
- public IndexingStreamRecordReader(StreamRecordReader reader,
IExternalIndexer indexer) {
- this.reader = reader;
- this.indexer = indexer;
- }
-
- @Override
- public void close() throws IOException {
- reader.close();
- }
-
- @Override
- public IExternalIndexer getIndexer() {
- return indexer;
- }
-
- @Override
- public boolean hasNext() throws Exception {
- return reader.hasNext();
- }
-
- @Override
- public IRawRecord<char[]> next() throws IOException, InterruptedException {
- return reader.next();
- }
-
- @Override
- public boolean stop() {
- return reader.stop();
- }
-
- @Override
- public void setController(AbstractFeedDataFlowController controller) {
- reader.setController(controller);
- }
-
- @Override
- public void setFeedLogManager(FeedLogManager feedLogManager) throws
HyracksDataException {
- reader.setFeedLogManager(feedLogManager);
- }
-
- @Override
- public List<ExternalFile> getSnapshot() {
- return null;
- }
-
- @Override
- public int getCurrentSplitIndex() {
- return -1;
- }
-
- @Override
- public RecordReader<?, ? extends Writable> getReader() {
- return null;
- }
-
- @Override
- public boolean handleException(Throwable th) {
- return reader.handleException(th);
- }
-
-}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
index eac4835ff7..7313b31078 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
@@ -48,11 +48,6 @@ public abstract class AbstractExternalInputStreamFactory
implements IInputStream
return DataSourceType.STREAM;
}
- @Override
- public boolean isIndexible() {
- return false;
- }
-
@Override
public abstract AsterixInputStream createInputStream(IHyracksTaskContext
ctx, int partition)
throws HyracksDataException;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
index 9fbc8007b2..48b99d1be6 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
@@ -19,35 +19,18 @@
package org.apache.asterix.external.input.record.reader.hdfs;
import java.io.IOException;
-import java.util.List;
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IIndexingDatasource;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-public class HDFSRecordReader<K, V extends Writable> extends
AbstractHDFSRecordReader<K, V>
- implements IIndexingDatasource {
- // Indexing variables
- private final IExternalIndexer indexer;
- private final List<ExternalFile> snapshot;
- private final FileSystem hdfs;
+public class HDFSRecordReader<K, V extends Writable> extends
AbstractHDFSRecordReader<K, V> {
public HDFSRecordReader(boolean[] read, InputSplit[] inputSplits, String[]
readSchedule, String nodeName,
- JobConf conf, List<ExternalFile> snapshot, IExternalIndexer
indexer) throws IOException {
+ JobConf conf) {
super(read, inputSplits, readSchedule, nodeName, conf);
- this.indexer = indexer;
- this.snapshot = snapshot;
- this.hdfs = FileSystem.get(conf);
}
@SuppressWarnings("unchecked")
@@ -58,26 +41,11 @@ public class HDFSRecordReader<K, V extends Writable>
extends AbstractHDFSRecordR
key = reader.createKey();
value = reader.createValue();
}
- if (indexer != null) {
- try {
- indexer.reset(this);
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
- }
return reader;
}
@Override
- protected boolean onNextInputSplit() throws IOException {
- if (snapshot != null) {
- String fileName = ((FileSplit)
(inputSplits[currentSplitIndex])).getPath().toUri().getPath();
- FileStatus fileStatus = hdfs.getFileStatus(new Path(fileName));
- // Skip if not the same file stored in the files snapshot
- if (fileStatus.getModificationTime() !=
snapshot.get(currentSplitIndex).getLastModefiedTime().getTime()) {
- return true;
- }
- }
+ protected boolean onNextInputSplit() {
return false;
}
@@ -86,21 +54,6 @@ public class HDFSRecordReader<K, V extends Writable> extends
AbstractHDFSRecordR
return false;
}
- @Override
- public IExternalIndexer getIndexer() {
- return indexer;
- }
-
- @Override
- public List<ExternalFile> getSnapshot() {
- return snapshot;
- }
-
- @Override
- public int getCurrentSplitIndex() {
- return currentSplitIndex;
- }
-
@Override
public RecordReader<K, V> getReader() {
return reader;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
index b52636a31d..1a5d2a25d9 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
@@ -84,11 +84,6 @@ public class RSSRecordReaderFactory implements
IRecordReaderFactory<SyndEntry> {
return recordReaderNames;
}
- @Override
- public boolean isIndexible() {
- return false;
- }
-
@Override
public IRecordReader<? extends SyndEntry>
createRecordReader(IHyracksTaskContext ctx, int partition)
throws HyracksDataException {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 0fd0b95247..52d8c03ed5 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -124,11 +124,6 @@ public class TwitterRecordReaderFactory implements
IRecordReaderFactory<char[]>
}
}
- @Override
- public boolean isIndexible() {
- return false;
- }
-
@Override
public IRecordReader<? extends char[]>
createRecordReader(IHyracksTaskContext ctx, int partition)
throws HyracksDataException {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
index 4cbfaa3026..46c21021fa 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
@@ -19,30 +19,20 @@
package org.apache.asterix.external.input.stream;
import java.io.IOException;
-import java.util.List;
import java.util.Map;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IIndexingDatasource;
-import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.input.record.reader.hdfs.EmptyRecordReader;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-public class HDFSInputStream extends AsterixInputStream implements
IIndexingDatasource {
+public class HDFSInputStream extends AsterixInputStream {
private RecordReader<Object, Text> reader;
private Text value = null;
@@ -54,16 +44,11 @@ public class HDFSInputStream extends AsterixInputStream
implements IIndexingData
private String[] readSchedule;
private String nodeName;
private JobConf conf;
- // Indexing variables
- private final IExternalIndexer indexer;
- private final List<ExternalFile> snapshot;
- private final FileSystem hdfs;
private int pos = 0;
@SuppressWarnings("unchecked")
public HDFSInputStream(boolean read[], InputSplit[] inputSplits, String[]
readSchedule, String nodeName,
- JobConf conf, Map<String, String> configuration,
List<ExternalFile> snapshot, IExternalIndexer indexer)
- throws IOException, AsterixException {
+ JobConf conf, Map<String, String> configuration) throws
IOException, AsterixException {
this.read = read;
this.inputSplits = inputSplits;
this.readSchedule = readSchedule;
@@ -71,16 +56,8 @@ public class HDFSInputStream extends AsterixInputStream
implements IIndexingData
this.conf = conf;
this.inputFormat = conf.getInputFormat();
this.reader = new EmptyRecordReader<Object, Text>();
- this.snapshot = snapshot;
- this.hdfs = FileSystem.get(conf);
- this.indexer = indexer;
nextInputSplit();
this.value = new Text();
- if (snapshot != null) {
- if (currentSplitIndex < snapshot.size()) {
- indexer.reset(this);
- }
- }
}
@Override
@@ -177,16 +154,6 @@ public class HDFSInputStream extends AsterixInputStream
implements IIndexingData
continue;
}
}
- if (snapshot != null) {
- String fileName = ((FileSplit)
(inputSplits[currentSplitIndex])).getPath().toUri().getPath();
- FileStatus fileStatus = hdfs.getFileStatus(new
Path(fileName));
- // Skip if not the same file stored in the files snapshot
- if (fileStatus.getModificationTime() !=
snapshot.get(currentSplitIndex).getLastModefiedTime()
- .getTime()) {
- continue;
- }
- }
-
reader.close();
reader = getRecordReader(currentSplitIndex);
return true;
@@ -202,33 +169,6 @@ public class HDFSInputStream extends AsterixInputStream
implements IIndexingData
key = reader.createKey();
value = reader.createValue();
}
- if (indexer != null) {
- try {
- indexer.reset(this);
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
- }
- return reader;
- }
-
- @Override
- public IExternalIndexer getIndexer() {
- return indexer;
- }
-
- @Override
- public List<ExternalFile> getSnapshot() {
- return snapshot;
- }
-
- @Override
- public int getCurrentSplitIndex() {
- return currentSplitIndex;
- }
-
- @Override
- public RecordReader<?, ? extends Writable> getReader() {
return reader;
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
index 58ef2a43be..cde0266a38 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
@@ -85,11 +85,6 @@ public class LocalFSInputStreamFactory implements
IInputStreamFactory {
return DataSourceType.STREAM;
}
- @Override
- public boolean isIndexible() {
- return false;
- }
-
@Override
public void configure(IServiceContext serviceCtx, Map<String, String>
configuration,
IWarningCollector warningCollector) throws AsterixException {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
index 1bd08d94ae..d628062401 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
@@ -83,9 +83,4 @@ public class SocketServerInputStreamFactory implements
IInputStreamFactory {
public DataSourceType getDataSourceType() {
return DataSourceType.STREAM;
}
-
- @Override
- public boolean isIndexible() {
- return false;
- }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
index 2b0bb55c27..ab7fa777f6 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
@@ -91,11 +91,6 @@ public class TwitterFirehoseStreamFactory implements
IInputStreamFactory {
this.configuration = configuration;
}
- @Override
- public boolean isIndexible() {
- return false;
- }
-
@Override
public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int
partition) throws HyracksDataException {
try {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index f60ecdcc62..c5315094c9 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -28,7 +28,6 @@ import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IDataFlowController;
import org.apache.asterix.external.api.IDataParserFactory;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
-import org.apache.asterix.external.api.IIndexingDatasource;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IRecordDataParserFactory;
@@ -43,7 +42,6 @@ import
org.apache.asterix.external.dataflow.ChangeFeedWithMetaDataFlowController
import org.apache.asterix.external.dataflow.FeedRecordDataFlowController;
import org.apache.asterix.external.dataflow.FeedStreamDataFlowController;
import org.apache.asterix.external.dataflow.FeedWithMetaDataFlowController;
-import org.apache.asterix.external.dataflow.IndexingDataFlowController;
import org.apache.asterix.external.dataflow.RecordDataFlowController;
import org.apache.asterix.external.dataflow.StreamDataFlowController;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -61,7 +59,7 @@ public class DataflowControllerProvider {
@SuppressWarnings({ "rawtypes", "unchecked" })
public static IDataFlowController getDataflowController(ARecordType
recordType, IHyracksTaskContext ctx,
int partition, IExternalDataSourceFactory dataSourceFactory,
IDataParserFactory dataParserFactory,
- Map<String, String> configuration, boolean indexingOp, boolean
isFeed, FeedLogManager feedLogManager)
+ Map<String, String> configuration, boolean isFeed, FeedLogManager
feedLogManager)
throws HyracksDataException {
try {
switch (dataSourceFactory.getDataSourceType()) {
@@ -72,10 +70,7 @@ public class DataflowControllerProvider {
IRecordDataParser<?> dataParser =
recordParserFactory.createRecordParser(ctx);
// TODO(ali): revisit to think about passing data source
name via setter or via createRecordParser
dataParser.configure(recordReader.getDataSourceName(),
recordReader.getLineNumber());
- if (indexingOp) {
- return new IndexingDataFlowController(ctx, dataParser,
recordReader,
- ((IIndexingDatasource)
recordReader).getIndexer());
- } else if (isFeed) {
+ if (isFeed) {
boolean isChangeFeed =
ExternalDataUtils.isChangeFeed(configuration);
boolean isRecordWithMeta =
ExternalDataUtils.isRecordWithMeta(configuration);
if (isRecordWithMeta) {
diff --git
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
index 31a9c51007..05d53b17ce 100644
---
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
+++
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
@@ -109,7 +109,6 @@ public interface Statement extends ILangExpression {
ANALYZE,
ANALYZE_DROP,
COMPACT,
- EXTERNAL_DATASET_REFRESH,
SUBSCRIBE_FEED,
EXTENSION,
}
diff --git
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/RefreshExternalDatasetStatement.java
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/RefreshExternalDatasetStatement.java
deleted file mode 100644
index 30a02fb5a4..0000000000
---
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/RefreshExternalDatasetStatement.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.lang.common.statement;
-
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.metadata.DataverseName;
-import org.apache.asterix.lang.common.base.AbstractStatement;
-import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-
-public class RefreshExternalDatasetStatement extends AbstractStatement {
-
- private DataverseName dataverseName;
- private Identifier datasetName;
-
- public Identifier getDatasetName() {
- return datasetName;
- }
-
- public void setDatasetName(Identifier datasetName) {
- this.datasetName = datasetName;
- }
-
- public DataverseName getDataverseName() {
- return dataverseName;
- }
-
- public void setDataverseName(DataverseName dataverseName) {
- this.dataverseName = dataverseName;
- }
-
- @Override
- public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws
CompilationException {
- return null;
- }
-
- @Override
- public Kind getKind() {
- return Statement.Kind.EXTERNAL_DATASET_REFRESH;
- }
-
- @Override
- public byte getCategory() {
- return Category.UPDATE;
- }
-
-}
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 1c4c9773a4..5ae1eb5aa8 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -162,7 +162,6 @@ import
org.apache.asterix.lang.common.statement.LoadStatement;
import org.apache.asterix.lang.common.statement.NodeGroupDropStatement;
import org.apache.asterix.lang.common.statement.NodegroupDecl;
import org.apache.asterix.lang.common.statement.Query;
-import
org.apache.asterix.lang.common.statement.RefreshExternalDatasetStatement;
import org.apache.asterix.lang.common.statement.SetStatement;
import org.apache.asterix.lang.common.statement.SynonymDropStatement;
import org.apache.asterix.lang.common.statement.TypeDecl;
@@ -939,7 +938,6 @@ Statement SingleStatement() throws ParseException:
| stmt = CompactStatement()
| stmt = AnalyzeStatement()
| stmt = Query()
- | stmt = RefreshExternalDatasetStatement()
)
{
return stmt;
@@ -1264,22 +1262,6 @@ void DatasetRecordField(RecordTypeDefinition recType)
throws ParseException:
}
}
-RefreshExternalDatasetStatement RefreshExternalDatasetStatement() throws
ParseException:
-{
- Token startToken = null;
- Pair<DataverseName,Identifier> nameComponents = null;
- String datasetName = null;
-}
-{
- <REFRESH> { startToken = token; } <EXTERNAL> Dataset() nameComponents =
QualifiedName()
- {
- RefreshExternalDatasetStatement stmt = new
RefreshExternalDatasetStatement();
- stmt.setDataverseName(nameComponents.first);
- stmt.setDatasetName(nameComponents.second);
- return addSourceLocation(stmt, startToken);
- }
-}
-
CreateIndexStatement CreateIndexStatement(Token startStmtToken) throws
ParseException:
{
CreateIndexStatement stmt = null;
@@ -5638,7 +5620,6 @@ TOKEN [IGNORE_CASE]:
| <PRESORTED : "pre-sorted">
| <PRIMARY : "primary">
| <RAW : "raw">
- | <REFRESH : "refresh">
| <RETURN : "return">
| <RETURNING : "returning">
| <RIGHT : "right">
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
index 91edc4c63c..88e96e73ab 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
@@ -142,7 +142,7 @@ public class BTreeResourceFactoryProvider implements
IResourceFactoryProvider {
return primaryTypeTraits;
} else if (dataset.getDatasetType() == DatasetType.EXTERNAL
&&
index.getIndexName().equals(IndexingConstants.getFilesIndexName(dataset.getDatasetName())))
{
- return null;
+ return new ITypeTraits[0];
}
Index.ValueIndexDetails indexDetails = (Index.ValueIndexDetails)
index.getIndexDetails();
int numPrimaryKeys = dataset.getPrimaryKeys().size();
@@ -175,7 +175,7 @@ public class BTreeResourceFactoryProvider implements
IResourceFactoryProvider {
return dataset.getPrimaryComparatorFactories(metadataProvider,
recordType, metaType);
} else if (dataset.getDatasetType() == DatasetType.EXTERNAL
&&
index.getIndexName().equals(IndexingConstants.getFilesIndexName(dataset.getDatasetName())))
{
- return null;
+ return new IBinaryComparatorFactory[0];
}
Index.ValueIndexDetails indexDetails = (Index.ValueIndexDetails)
index.getIndexDetails();
int numPrimaryKeys = dataset.getPrimaryKeys().size();
@@ -210,7 +210,7 @@ public class BTreeResourceFactoryProvider implements
IResourceFactoryProvider {
if (dataset.getDatasetType() == DatasetType.EXTERNAL
&& index.getIndexType() != DatasetConfig.IndexType.SAMPLE) {
if
(index.getIndexName().equals(IndexingConstants.getFilesIndexName(dataset.getDatasetName())))
{
- return null;
+ return new int[0];
} else {
Index.ValueIndexDetails indexDetails =
((Index.ValueIndexDetails) index.getIndexDetails());
return new int[] { indexDetails.getKeyFieldNames().size() };
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 409f0e73dc..550864485e 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -28,7 +28,6 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -36,7 +35,6 @@ import java.util.stream.Collectors;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.context.IStorageComponentProvider;
@@ -59,8 +57,6 @@ import
org.apache.asterix.dataflow.data.nontagged.serde.SerializerDeserializerUt
import org.apache.asterix.external.adapter.factory.ExternalAdapterFactory;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
import org.apache.asterix.external.provider.AdapterFactoryProvider;
@@ -893,27 +889,8 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
try {
configuration.put(ExternalDataConstants.KEY_DATASET_DATAVERSE,
dataset.getDataverseName().getCanonicalForm());
- ITypedAdapterFactory adapterFactory =
-
AdapterFactoryProvider.getAdapterFactory(getApplicationContext().getServiceContext(),
adapterName,
- configuration, itemType, metaType,
warningCollector);
-
- // check to see if dataset is indexed
- Index filesIndex =
- MetadataManager.INSTANCE.getIndex(mdTxnCtx,
dataset.getDataverseName(), dataset.getDatasetName(),
-
dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX));
-
- if (filesIndex != null && filesIndex.getPendingOp() == 0) {
- // get files
- List<ExternalFile> files =
MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
- Iterator<ExternalFile> iterator = files.iterator();
- while (iterator.hasNext()) {
- if (iterator.next().getPendingOp() !=
ExternalFilePendingOp.NO_OP) {
- iterator.remove();
- }
- }
- }
-
- return adapterFactory;
+ return
AdapterFactoryProvider.getAdapterFactory(getApplicationContext().getServiceContext(),
adapterName,
+ configuration, itemType, metaType, warningCollector);
} catch (Exception e) {
throw new AlgebricksException("Unable to create adapter", e);
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/ExternalDatasetsRegistry.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/ExternalDatasetsRegistry.java
deleted file mode 100644
index 4d8bacf15c..0000000000
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/ExternalDatasetsRegistry.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.lock;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.utils.ExternalDatasetAccessManager;
-
-/**
- * This is a singelton class used to maintain the version of each external
dataset with indexes
- * It should be consolidated once a better global dataset lock management is
introduced.
- *
- * @author alamouda
- */
-public class ExternalDatasetsRegistry {
- public static final ExternalDatasetsRegistry INSTANCE = new
ExternalDatasetsRegistry();
- private final ConcurrentHashMap<String, ExternalDatasetAccessManager>
globalRegister;
-
- private ExternalDatasetsRegistry() {
- globalRegister = new ConcurrentHashMap<>();
- }
-
- /**
- * Get the current version of the dataset
- *
- * @param dataset
- * @return
- */
- public int getDatasetVersion(Dataset dataset) {
- String key = dataset.getDataverseName() + "." +
dataset.getDatasetName();
- ExternalDatasetAccessManager datasetAccessMgr =
globalRegister.get(key);
- if (datasetAccessMgr == null) {
- globalRegister.putIfAbsent(key, new
ExternalDatasetAccessManager());
- datasetAccessMgr = globalRegister.get(key);
- }
- return datasetAccessMgr.getVersion();
- }
-
- public int getAndLockDatasetVersion(Dataset dataset, MetadataProvider
metadataProvider) {
-
- Map<String, Integer> locks;
- String lockKey = dataset.getDataverseName() + "." +
dataset.getDatasetName();
- // check first if the lock was aquired already
- locks = metadataProvider.getExternalDataLocks();
- if (locks == null) {
- locks = new HashMap<>();
- metadataProvider.setExternalDataLocks(locks);
- } else {
- // if dataset was accessed already by this job, return the
registered version
- Integer version = locks.get(lockKey);
- if (version != null) {
- return version;
- }
- }
-
- ExternalDatasetAccessManager datasetAccessMgr =
globalRegister.get(lockKey);
- if (datasetAccessMgr == null) {
- globalRegister.putIfAbsent(lockKey, new
ExternalDatasetAccessManager());
- datasetAccessMgr = globalRegister.get(lockKey);
- }
-
- // aquire the correct lock
- int version = datasetAccessMgr.queryBegin();
- locks.put(lockKey, version);
- return version;
- }
-
- public void refreshBegin(Dataset dataset) {
- String key = dataset.getDataverseName() + "." +
dataset.getDatasetName();
- ExternalDatasetAccessManager datasetAccessMgr =
globalRegister.get(key);
- if (datasetAccessMgr == null) {
- datasetAccessMgr = globalRegister.put(key, new
ExternalDatasetAccessManager());
- }
- // aquire the correct lock
- datasetAccessMgr.refreshBegin();
- }
-
- public void removeDatasetInfo(Dataset dataset) {
- String key = dataset.getDataverseName() + "." +
dataset.getDatasetName();
- globalRegister.remove(key);
- }
-
- public void refreshEnd(Dataset dataset, boolean success) {
- String key = dataset.getDataverseName() + "." +
dataset.getDatasetName();
- globalRegister.get(key).refreshEnd(success);
- }
-
- public void buildIndexBegin(Dataset dataset, boolean firstIndex) {
- String key = dataset.getDataverseName() + "." +
dataset.getDatasetName();
- ExternalDatasetAccessManager datasetAccessMgr =
globalRegister.get(key);
- if (datasetAccessMgr == null) {
- globalRegister.putIfAbsent(key, new
ExternalDatasetAccessManager());
- datasetAccessMgr = globalRegister.get(key);
- }
- // aquire the correct lock
- datasetAccessMgr.buildIndexBegin(firstIndex);
- }
-
- public void buildIndexEnd(Dataset dataset, boolean firstIndex) {
- String key = dataset.getDataverseName() + "." +
dataset.getDatasetName();
- globalRegister.get(key).buildIndexEnd(firstIndex);
- }
-
- public void releaseAcquiredLocks(MetadataProvider metadataProvider) {
- Map<String, Integer> locks = metadataProvider.getExternalDataLocks();
- if (locks == null) {
- return;
- } else {
- // if dataset was accessed already by this job, return the
registered version
- Set<Entry<String, Integer>> aquiredLocks = locks.entrySet();
- for (Entry<String, Integer> entry : aquiredLocks) {
- ExternalDatasetAccessManager accessManager =
globalRegister.get(entry.getKey());
- if (accessManager != null) {
- accessManager.queryEnd(entry.getValue());
- }
- }
- locks.clear();
- }
- }
-}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 083b9641a6..79bf3e8454 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -43,7 +43,6 @@ import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.common.transactions.IRecoveryManager;
-import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.formats.base.IDataFormat;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.formats.nontagged.TypeTraitProvider;
@@ -319,21 +318,6 @@ public class DatasetUtil {
return specPrimary;
}
- public static JobSpecification buildDropFilesIndexJobSpec(MetadataProvider
metadataProvider, Dataset dataset)
- throws AlgebricksException {
- String indexName =
IndexingConstants.getFilesIndexName(dataset.getDatasetName());
- JobSpecification spec =
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
splitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset,
indexName);
- IIndexDataflowHelperFactory indexHelperFactory = new
IndexDataflowHelperFactory(
-
metadataProvider.getStorageComponentProvider().getStorageManager(),
splitsAndConstraint.first);
- IndexDropOperatorDescriptor btreeDrop = new
IndexDropOperatorDescriptor(spec, indexHelperFactory);
-
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
btreeDrop,
- splitsAndConstraint.second);
- spec.addRoot(btreeDrop);
- return spec;
- }
-
public static JobSpecification createDatasetJobSpec(Dataset dataset,
MetadataProvider metadataProvider)
throws AlgebricksException {
Index index = IndexUtil.getPrimaryIndex(dataset);
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetAccessManager.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetAccessManager.java
deleted file mode 100644
index b94acf3988..0000000000
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetAccessManager.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.utils;
-
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class ExternalDatasetAccessManager {
- // a version to indicate the current version of the dataset
- private int version;
- // a lock to allow concurrent build index operation and serialize refresh
operations
- private ReentrantReadWriteLock datasetLock;
- // a lock per version of the dataset to keep a version alive while queries
are still assigned to it
- private ReentrantReadWriteLock v0Lock;
- private ReentrantReadWriteLock v1Lock;
-
- public ExternalDatasetAccessManager() {
- this.version = 0;
- this.v0Lock = new ReentrantReadWriteLock(false);
- this.v1Lock = new ReentrantReadWriteLock(false);
- this.datasetLock = new ReentrantReadWriteLock(true);
- }
-
- public int getVersion() {
- return version;
- }
-
- public void setVersion(int version) {
- this.version = version;
- }
-
- public ReentrantReadWriteLock getV0Lock() {
- return v0Lock;
- }
-
- public void setV0Lock(ReentrantReadWriteLock v0Lock) {
- this.v0Lock = v0Lock;
- }
-
- public ReentrantReadWriteLock getV1Lock() {
- return v1Lock;
- }
-
- public void setV1Lock(ReentrantReadWriteLock v1Lock) {
- this.v1Lock = v1Lock;
- }
-
- public int refreshBegin() {
- datasetLock.writeLock().lock();
- if (version == 0) {
- v1Lock.writeLock().lock();
- } else {
- v0Lock.writeLock().lock();
- }
- return version;
- }
-
- public void refreshEnd(boolean success) {
- if (version == 0) {
- v1Lock.writeLock().unlock();
- if (success) {
- version = 1;
- }
- } else {
- v0Lock.writeLock().unlock();
- if (success) {
- version = 0;
- }
- }
- datasetLock.writeLock().unlock();
- }
-
- public synchronized int buildIndexBegin(boolean isFirstIndex) {
- if (isFirstIndex) {
- datasetLock.writeLock().lock();
- } else {
- datasetLock.readLock().lock();
- }
- return version;
- }
-
- public void buildIndexEnd(boolean isFirstIndex) {
- if (isFirstIndex) {
- datasetLock.writeLock().unlock();
- } else {
- datasetLock.readLock().unlock();
- }
- }
-
- public int queryBegin() {
- if (version == 0) {
- v0Lock.readLock().lock();
- return 0;
- } else {
- v1Lock.readLock().lock();
- return 1;
- }
- }
-
- public void queryEnd(int version) {
- if (version == 0) {
- v0Lock.readLock().unlock();
- } else {
- v1Lock.readLock().unlock();
- }
- }
-}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index 4580a8db36..808b8a4578 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -174,9 +174,6 @@ public class IndexUtil {
secondaryIndexHelper =
SecondaryTreeIndexOperationsHelper.createIndexOperationsHelper(dataset, index,
metadataProvider, sourceLoc);
}
- if (files != null) {
- ((SecondaryIndexOperationsHelper)
secondaryIndexHelper).setExternalFiles(files);
- }
return secondaryIndexHelper.buildLoadingJobSpec();
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index 25b138dbd9..ea7da19bf9 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -29,7 +29,6 @@ import
org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.OptimizationConfUtil;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.formats.base.IDataFormat;
import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
@@ -119,7 +118,6 @@ public abstract class SecondaryIndexOperationsHelper
implements ISecondaryIndexO
protected int[] primaryFilterFields;
protected int[] primaryBTreeFields;
protected int[] secondaryBTreeFields;
- protected List<ExternalFile> externalFiles;
protected int numPrimaryKeys;
protected final SourceLocation sourceLoc;
protected final int sortNumFrames;
@@ -519,10 +517,6 @@ public abstract class SecondaryIndexOperationsHelper
implements ISecondaryIndexO
new RecordDescriptor[] { secondaryRecDesc });
}
- public void setExternalFiles(List<ExternalFile> externalFiles) {
- this.externalFiles = externalFiles;
- }
-
@Override
public RecordDescriptor getSecondaryRecDesc() {
return secondaryRecDesc;