http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java deleted file mode 100644 index 3216bfe..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.feed.management; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.active.ActiveEvent; -import org.apache.asterix.active.ActiveEvent.Kind; -import org.apache.asterix.active.ActiveLifecycleListener; -import org.apache.asterix.active.ActiveRuntimeId; -import org.apache.asterix.active.ActivityState; -import org.apache.asterix.active.EntityId; -import org.apache.asterix.active.IActiveEntityEventsListener; -import org.apache.asterix.active.IActiveEventSubscriber; -import org.apache.asterix.active.message.ActiveManagerMessage; -import org.apache.asterix.active.message.ActivePartitionMessage; -import org.apache.asterix.active.message.StatsRequestMessage; -import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.asterix.common.messaging.api.ICCMessageBroker; -import org.apache.asterix.common.messaging.api.INcAddressedMessage; -import org.apache.asterix.common.metadata.IDataset; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.job.JobStatus; - -public class ActiveEntityEventsListener implements IActiveEntityEventsListener { - - private static final Logger LOGGER = Logger.getLogger(ActiveEntityEventsListener.class.getName()); - - enum RequestState { - INIT, - STARTED, - FINISHED - } - - // members - protected volatile ActivityState state; - protected JobId jobId; - protected final List<IActiveEventSubscriber> subscribers = new ArrayList<>(); - protected final ICcApplicationContext appCtx; - protected final EntityId entityId; - protected final List<IDataset> datasets; - protected final ActiveEvent statsUpdatedEvent; - protected long statsTimestamp; - protected String stats; - protected RequestState statsRequestState; - protected final String runtimeName; - protected final AlgebricksAbsolutePartitionConstraint locations; - protected int numRegistered; - - public ActiveEntityEventsListener(ICcApplicationContext appCtx, EntityId entityId, List<IDataset> datasets, - AlgebricksAbsolutePartitionConstraint locations, String runtimeName) { - this.appCtx = appCtx; - this.entityId = entityId; - this.datasets = datasets; - this.state = ActivityState.STOPPED; - this.statsTimestamp = -1; - this.statsRequestState = RequestState.INIT; - this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, entityId); - this.stats = "{\"Stats\":\"N/A\"}"; - this.runtimeName = runtimeName; - this.locations = locations; - this.numRegistered = 0; - } - - @Override - public synchronized void notify(ActiveEvent event) { - try { - LOGGER.finer("EventListener is notified."); - ActiveEvent.Kind eventKind = event.getEventKind(); - switch (eventKind) { - case JOB_CREATED: - state = ActivityState.CREATED; - break; - case JOB_STARTED: - start(event); - break; - case JOB_FINISHED: - finish(); - break; - case PARTITION_EVENT: - handle((ActivePartitionMessage) event.getEventObject()); - break; - default: - LOGGER.log(Level.WARNING, "Unhandled feed event notification: " + event); - break; - } - notifySubscribers(event); - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "Unhandled Exception", e); - } - } - - protected synchronized void handle(ActivePartitionMessage message) { - if (message.getEvent() == ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) { - numRegistered++; - if (numRegistered == locations.getLocations().length) { - state = ActivityState.STARTED; - } - } - } - - private void finish() throws Exception { - IHyracksClientConnection hcc = appCtx.getHcc(); - JobStatus status = hcc.getJobStatus(jobId); - state = status.equals(JobStatus.FAILURE) ? ActivityState.FAILED : ActivityState.STOPPED; - ActiveLifecycleListener activeLcListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - activeLcListener.getNotificationHandler().removeListener(this); - } - - private void start(ActiveEvent event) { - this.jobId = event.getJobId(); - state = ActivityState.STARTING; - } - - @Override - public synchronized void subscribe(IActiveEventSubscriber subscriber) throws HyracksDataException { - if (this.state == ActivityState.FAILED) { - throw new RuntimeDataException(ErrorCode.CANNOT_SUBSCRIBE_TO_FAILED_ACTIVE_ENTITY); - } - subscriber.subscribed(this); - if (!subscriber.isDone()) { - subscribers.add(subscriber); - } - } - - @Override - public EntityId getEntityId() { - return entityId; - } - - @Override - public ActivityState getState() { - return state; - } - - @Override - public boolean isEntityUsingDataset(IDataset dataset) { - return datasets.contains(dataset); - } - - public JobId getJobId() { - return jobId; - } - - @Override - public String getStats() { - return stats; - } - - @Override - public long getStatsTimeStamp() { - return statsTimestamp; - } - - public String formatStats(List<String> responses) { - StringBuilder strBuilder = new StringBuilder(); - strBuilder.append("{\"Stats\": [").append(responses.get(0)); - for (int i = 1; i < responses.size(); i++) { - strBuilder.append(", ").append(responses.get(i)); - } - strBuilder.append("]}"); - return strBuilder.toString(); - } - - @SuppressWarnings("unchecked") - @Override - public void refreshStats(long timeout) throws HyracksDataException { - LOGGER.log(Level.INFO, "refreshStats called"); - synchronized (this) { - if (state != ActivityState.STARTED || statsRequestState == RequestState.STARTED) { - LOGGER.log(Level.INFO, "returning immediately since state = " + state + " and statsRequestState = " - + statsRequestState); - return; - } else { - statsRequestState = RequestState.STARTED; - } - } - ICCMessageBroker messageBroker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker(); - long reqId = messageBroker.newRequestId(); - List<INcAddressedMessage> requests = new ArrayList<>(); - List<String> ncs = Arrays.asList(locations.getLocations()); - for (int i = 0; i < ncs.size(); i++) { - requests.add(new StatsRequestMessage(ActiveManagerMessage.REQUEST_STATS, - new ActiveRuntimeId(entityId, runtimeName, i), reqId)); - } - try { - List<String> responses = (List<String>) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout); - stats = formatStats(responses); - statsTimestamp = System.currentTimeMillis(); - notifySubscribers(statsUpdatedEvent); - } catch (Exception e) { - throw HyracksDataException.create(e); - } - // Same as above - statsRequestState = RequestState.FINISHED; - } - - protected synchronized void notifySubscribers(ActiveEvent event) { - notifyAll(); - Iterator<IActiveEventSubscriber> it = subscribers.iterator(); - while (it.hasNext()) { - IActiveEventSubscriber subscriber = it.next(); - if (subscriber.isDone()) { - it.remove(); - } else { - try { - subscriber.notify(event); - } catch (HyracksDataException e) { - LOGGER.log(Level.WARNING, "Failed to notify subscriber", e); - } - if (subscriber.isDone()) { - it.remove(); - } - } - } - } - - public AlgebricksAbsolutePartitionConstraint getLocations() { - return locations; - } - -}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java index e6ac265..d102d0c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java @@ -77,10 +77,14 @@ public class AdapterExecutor implements Runnable { // Adapter has completed execution continueIngestion = false; } catch (InterruptedException e) { + adapter.fail(); throw e; } catch (Exception e) { LOGGER.error("Exception during feed ingestion ", e); continueIngestion = adapter.handleException(e); + if (!continueIngestion) { + adapter.fail(); + } failedIngestion = !continueIngestion; restartCount++; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java index 822d725..e21f9eb 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java @@ -18,39 +18,48 @@ */ package org.apache.asterix.external.feed.watch; +import org.apache.asterix.active.IActiveEntityEventSubscriber; import org.apache.asterix.active.IActiveEntityEventsListener; -import org.apache.asterix.active.IActiveEventSubscriber; import org.apache.hyracks.api.exceptions.HyracksDataException; -public abstract class AbstractSubscriber implements IActiveEventSubscriber { +public abstract class AbstractSubscriber implements IActiveEntityEventSubscriber { protected final IActiveEntityEventsListener listener; - private boolean done = false; + private volatile boolean done = false; + private volatile Exception failure = null; public AbstractSubscriber(IActiveEntityEventsListener listener) { this.listener = listener; } @Override - public synchronized boolean isDone() { + public boolean isDone() { return done; } - public synchronized void complete() throws HyracksDataException { - done = true; - notifyAll(); + public void complete(Exception failure) { + synchronized (listener) { + if (failure != null) { + this.failure = failure; + } + done = true; + listener.notifyAll(); + } } @Override - public synchronized void sync() throws InterruptedException { - while (!done) { - wait(); + public void sync() throws HyracksDataException, InterruptedException { + synchronized (listener) { + while (!done) { + if (failure != null) { + throw HyracksDataException.create(failure); + } + listener.wait(); + } } } - @Override - public synchronized void unsubscribe() { - done = true; - notifyAll(); + public Exception getFailure() { + return failure; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java index 42f7a74..8230b48 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java @@ -19,14 +19,14 @@ package org.apache.asterix.external.feed.watch; import org.apache.asterix.active.ActiveEvent; +import org.apache.asterix.active.IActiveEntityEventSubscriber; import org.apache.asterix.active.IActiveEntityEventsListener; -import org.apache.asterix.active.IActiveEventSubscriber; import org.apache.hyracks.api.exceptions.HyracksDataException; /** * An event subscriber that does not listen to any events */ -public class NoOpSubscriber implements IActiveEventSubscriber { +public class NoOpSubscriber implements IActiveEntityEventSubscriber { public static final NoOpSubscriber INSTANCE = new NoOpSubscriber(); @@ -49,11 +49,6 @@ public class NoOpSubscriber implements IActiveEventSubscriber { } @Override - public void unsubscribe() { - // no op - } - - @Override public void subscribed(IActiveEntityEventsListener eventsListener) throws HyracksDataException { // no op } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java index fa2fa7f..a571904 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java @@ -32,7 +32,17 @@ public class StatsSubscriber extends AbstractSubscriber { @Override public void notify(ActiveEvent event) throws HyracksDataException { if (event.getEventKind() == ActiveEvent.Kind.STATS_UPDATED) { - complete(); + try { + complete(null); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } else if (event.getEventKind() == ActiveEvent.Kind.FAILURE) { + try { + complete((Exception) event.getEventObject()); + } catch (Exception e) { + throw HyracksDataException.create(e); + } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java index 7bab421..a1cdfb0 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java @@ -18,38 +18,46 @@ */ package org.apache.asterix.external.feed.watch; +import java.util.Set; + import org.apache.asterix.active.ActiveEvent; import org.apache.asterix.active.ActivityState; import org.apache.asterix.active.IActiveEntityEventsListener; -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.hyracks.api.exceptions.HyracksDataException; public class WaitForStateSubscriber extends AbstractSubscriber { - private final ActivityState targetState; + private final Set<ActivityState> targetStates; - public WaitForStateSubscriber(IActiveEntityEventsListener listener, ActivityState targetState) + public WaitForStateSubscriber(IActiveEntityEventsListener listener, Set<ActivityState> targetStates) throws HyracksDataException { super(listener); - this.targetState = targetState; + this.targetStates = targetStates; listener.subscribe(this); } @Override public void notify(ActiveEvent event) throws HyracksDataException { - if (listener.getState() == targetState) { - complete(); + if (targetStates.contains(listener.getState())) { + if (listener.getState() == ActivityState.PERMANENTLY_FAILED + || listener.getState() == ActivityState.TEMPORARILY_FAILED) { + complete(listener.getJobFailure()); + } else { + complete(null); + } + } else if (event != null && event.getEventKind() == ActiveEvent.Kind.FAILURE) { + try { + complete((Exception) event.getEventObject()); + } catch (Exception e) { + throw HyracksDataException.create(e); + } } } @Override public void subscribed(IActiveEntityEventsListener eventsListener) throws HyracksDataException { - if (eventsListener.getState() == ActivityState.FAILED) { - throw new RuntimeDataException(ErrorCode.CANNOT_SUBSCRIBE_TO_FAILED_ACTIVE_ENTITY); - } - if (listener.getState() == targetState) { - complete(); + if (targetStates.contains(listener.getState())) { + complete(null); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java index 1d8ae5e..f0539c6 100644 --- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java +++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java @@ -25,6 +25,7 @@ import java.util.logging.Logger; import org.apache.asterix.active.EntityId; import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.external.feed.management.FeedConnectionRequest; import org.apache.asterix.external.feed.watch.FeedActivityDetails; @@ -37,7 +38,6 @@ import org.apache.asterix.lang.common.statement.InsertStatement; import org.apache.asterix.lang.common.statement.Query; import org.apache.asterix.lang.common.util.FunctionUtil; import org.apache.asterix.lang.common.visitor.base.ILangVisitor; -import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.metadata.entities.Feed; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java index 3289d68..3d8fc68 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.lang.common.statement; +import java.util.List; + import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.lang.common.base.Statement; @@ -26,9 +28,6 @@ import org.apache.asterix.lang.common.visitor.base.ILangVisitor; import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies; import org.apache.hyracks.algebricks.common.utils.Pair; -import java.util.ArrayList; -import java.util.List; - public class ConnectFeedStatement implements Statement { private final Identifier dataverseName; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java index c84a5bd..08ca03b 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java @@ -57,8 +57,7 @@ public class MetadataCache { // Key is dataverse name. Key of value map is dataset name. protected final Map<String, Map<String, Dataset>> datasets = new HashMap<>(); // Key is dataverse name. Key of value map is dataset name. Key of value map of value map is index name. - protected final Map<String, Map<String, Map<String, Index>>> indexes = - new HashMap<>(); + protected final Map<String, Map<String, Map<String, Index>>> indexes = new HashMap<>(); // Key is dataverse name. Key of value map is datatype name. protected final Map<String, Map<String, Datatype>> datatypes = new HashMap<>(); // Key is dataverse name. @@ -66,19 +65,16 @@ public class MetadataCache { // Key is function Identifier . Key of value map is function name. protected final Map<FunctionSignature, Function> functions = new HashMap<>(); // Key is adapter dataverse. Key of value map is the adapter name - protected final Map<String, Map<String, DatasourceAdapter>> adapters = - new HashMap<>(); + protected final Map<String, Map<String, DatasourceAdapter>> adapters = new HashMap<>(); // Key is DataverseName, Key of the value map is the Policy name - protected final Map<String, Map<String, FeedPolicyEntity>> feedPolicies = - new HashMap<>(); + protected final Map<String, Map<String, FeedPolicyEntity>> feedPolicies = new HashMap<>(); // Key is library dataverse. Key of value map is the library name protected final Map<String, Map<String, Library>> libraries = new HashMap<>(); // Key is library dataverse. Key of value map is the feed name protected final Map<String, Map<String, Feed>> feeds = new HashMap<>(); // Key is DataverseName, Key of the value map is the Policy name - protected final Map<String, Map<String, CompactionPolicy>> compactionPolicies = - new HashMap<>(); + protected final Map<String, Map<String, CompactionPolicy>> compactionPolicies = new HashMap<>(); // Key is DataverseName, Key of value map is feedConnectionId protected final Map<String, Map<String, FeedConnection>> feedConnections = new HashMap<>(); @@ -247,8 +243,7 @@ public class MetadataCache { datatypes.remove(dataverse.getDataverseName()); adapters.remove(dataverse.getDataverseName()); compactionPolicies.remove(dataverse.getDataverseName()); - List<FunctionSignature> markedFunctionsForRemoval = - new ArrayList<>(); + List<FunctionSignature> markedFunctionsForRemoval = new ArrayList<>(); for (FunctionSignature signature : functions.keySet()) { if (signature.getNamespace().equals(dataverse.getDataverseName())) { markedFunctionsForRemoval.add(signature); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataException.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataException.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataException.java deleted file mode 100644 index 61c21f4..0000000 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataException.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.metadata; - -import org.apache.asterix.common.exceptions.CompilationException; - -import java.io.Serializable; - -public class MetadataException extends CompilationException { - private static final long serialVersionUID = 1L; - - public MetadataException(String message) { - super(message); - } - - public MetadataException(Throwable cause) { - super(cause); - } - - public MetadataException(String message, Throwable cause) { - super(message, cause); - } - - public MetadataException(int errorCode, Serializable... params) { - super(errorCode, params); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java index 11645e8..5e6d11a 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java @@ -28,6 +28,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.asterix.common.config.MetadataProperties; import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.MetadataException; +import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.external.indexing.ExternalFile; @@ -149,7 +152,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.addDataverse(ctx.getJobId(), dataverse); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } ctx.addDataverse(dataverse); } @@ -159,7 +162,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.dropDataverse(ctx.getJobId(), dataverseName); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } ctx.dropDataverse(dataverseName); } @@ -169,7 +172,7 @@ public class MetadataManager implements IMetadataManager { try { return metadataNode.getDataverses(ctx.getJobId()); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } } @@ -196,7 +199,7 @@ public class MetadataManager implements IMetadataManager { try { dataverse = metadataNode.getDataverse(ctx.getJobId(), dataverseName); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } // We fetched the dataverse from the MetadataNode. Add it to the cache // when this transaction commits. @@ -227,7 +230,7 @@ public class MetadataManager implements IMetadataManager { // metadata node. dataverseDatasets.addAll(metadataNode.getDataverseDatasets(ctx.getJobId(), dataverseName)); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } // Don't update the cache to avoid checking against the transaction's // uncommitted datasets. @@ -241,7 +244,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.addDataset(ctx.getJobId(), dataset); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } } @@ -258,7 +261,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.dropDataset(ctx.getJobId(), dataverseName, datasetName); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } } @@ -292,7 +295,7 @@ public class MetadataManager implements IMetadataManager { try { dataset = metadataNode.getDataset(ctx.getJobId(), dataverseName, datasetName); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } // We fetched the dataset from the MetadataNode. Add it to the cache // when this transaction commits. @@ -318,7 +321,7 @@ public class MetadataManager implements IMetadataManager { // for persistent datasets datasetIndexes = metadataNode.getDatasetIndexes(ctx.getJobId(), dataverseName, datasetName); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } } return datasetIndexes; @@ -330,7 +333,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.addCompactionPolicy(mdTxnCtx.getJobId(), compactionPolicy); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } mdTxnCtx.addCompactionPolicy(compactionPolicy); } @@ -343,7 +346,7 @@ public class MetadataManager implements IMetadataManager { try { compactionPolicy = metadataNode.getCompactionPolicy(ctx.getJobId(), dataverse, policyName); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } return compactionPolicy; } @@ -353,13 +356,13 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.addDatatype(ctx.getJobId(), datatype); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } try { ctx.addDatatype( metadataNode.getDatatype(ctx.getJobId(), datatype.getDataverseName(), datatype.getDatatypeName())); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } } @@ -369,7 +372,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.dropDatatype(ctx.getJobId(), dataverseName, datatypeName); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } ctx.dropDataDatatype(dataverseName, datatypeName); } @@ -406,7 +409,7 @@ public class MetadataManager implements IMetadataManager { try { datatype = metadataNode.getDatatype(ctx.getJobId(), dataverseName, datatypeName); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } // We fetched the datatype from the MetadataNode. Add it to the cache // when this transaction commits. @@ -425,7 +428,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.addIndex(ctx.getJobId(), index); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } } ctx.addIndex(index); @@ -436,7 +439,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.addAdapter(mdTxnCtx.getJobId(), adapter); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } mdTxnCtx.addAdapter(adapter); @@ -452,7 +455,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.dropIndex(ctx.getJobId(), dataverseName, datasetName, indexName); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } } ctx.dropIndex(dataverseName, datasetName, indexName); @@ -485,7 +488,7 @@ public class MetadataManager implements IMetadataManager { try { index = metadataNode.getIndex(ctx.getJobId(), dataverseName, datasetName, indexName); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } // We fetched the index from the MetadataNode. Add it to the cache // when this transaction commits. @@ -500,7 +503,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.addNode(ctx.getJobId(), node); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } } @@ -509,7 +512,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.addNodeGroup(ctx.getJobId(), nodeGroup); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } ctx.addNodeGroup(nodeGroup); } @@ -521,7 +524,7 @@ public class MetadataManager implements IMetadataManager { try { dropped = metadataNode.dropNodegroup(ctx.getJobId(), nodeGroupName, failSilently); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } if (dropped) { ctx.dropNodeGroup(nodeGroupName); @@ -551,7 +554,7 @@ public class MetadataManager implements IMetadataManager { try { nodeGroup = metadataNode.getNodeGroup(ctx.getJobId(), nodeGroupName); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } // We fetched the nodeGroup from the MetadataNode. Add it to the cache // when this transaction commits. @@ -566,7 +569,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.updateFunction(mdTxnCtx.getJobId(), function); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } mdTxnCtx.dropFunction(function); mdTxnCtx.addFunction(function); @@ -577,7 +580,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.addFunction(mdTxnCtx.getJobId(), function); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } mdTxnCtx.addFunction(function); } @@ -588,7 +591,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.dropFunction(ctx.getJobId(), functionSignature); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } ctx.dropFunction(functionSignature); } @@ -622,7 +625,7 @@ public class MetadataManager implements IMetadataManager { try { function = metadataNode.getFunction(ctx.getJobId(), functionSignature); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } // We fetched the function from the MetadataNode. Add it to the cache // when this transaction commits. @@ -639,7 +642,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.addFeedPolicy(mdTxnCtx.getJobId(), feedPolicy); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } mdTxnCtx.addFeedPolicy(feedPolicy); } @@ -649,7 +652,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.initializeDatasetIdFactory(ctx.getJobId()); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } } @@ -658,7 +661,7 @@ public class MetadataManager implements IMetadataManager { try { return metadataNode.getMostRecentDatasetId(); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } } @@ -671,7 +674,7 @@ public class MetadataManager implements IMetadataManager { // metadata node. dataverseFunctions = metadataNode.getDataverseFunctions(ctx.getJobId(), dataverseName); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } // Don't update the cache to avoid checking against the transaction's // uncommitted functions. @@ -684,7 +687,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.dropAdapter(ctx.getJobId(), dataverseName, name); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } } @@ -695,7 +698,7 @@ public class MetadataManager implements IMetadataManager { try { adapter = metadataNode.getAdapter(ctx.getJobId(), dataverseName, name); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } return adapter; } @@ -706,7 +709,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.dropLibrary(ctx.getJobId(), dataverseName, libraryName); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } ctx.dropLibrary(dataverseName, libraryName); } @@ -720,7 +723,7 @@ public class MetadataManager implements IMetadataManager { // metadata node. dataverseLibaries = metadataNode.getDataverseLibraries(ctx.getJobId(), dataverseName); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } // Don't update the cache to avoid checking against the transaction's // uncommitted functions. @@ -732,7 +735,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.addLibrary(ctx.getJobId(), library); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } ctx.addLibrary(library); } @@ -744,7 +747,7 @@ public class MetadataManager implements IMetadataManager { try { library = metadataNode.getLibrary(ctx.getJobId(), dataverseName, libraryName); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } return library; } @@ -777,7 +780,7 @@ public class MetadataManager implements IMetadataManager { try { feedPolicy = metadataNode.getFeedPolicy(ctx.getJobId(), dataverse, policyName); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } return feedPolicy; } @@ -788,7 +791,7 @@ public class MetadataManager implements IMetadataManager { try { feed = metadataNode.getFeed(ctx.getJobId(), dataverse, feedName); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } return feed; } @@ -806,7 +809,7 @@ public class MetadataManager implements IMetadataManager { ctx.dropFeedConnection(dataverse, feedName, feedConnection.getDatasetName()); } } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } ctx.dropFeed(feed); } @@ -816,7 +819,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.addFeed(ctx.getJobId(), feed); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } ctx.addFeed(feed); } @@ -827,7 +830,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.addFeedConnection(ctx.getJobId(), feedConnection); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } ctx.addFeedConnection(feedConnection); } @@ -838,7 +841,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.dropFeedConnection(ctx.getJobId(), dataverseName, feedName, datasetName); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } ctx.dropFeedConnection(dataverseName, feedName, datasetName); } @@ -849,7 +852,7 @@ public class MetadataManager implements IMetadataManager { try { return metadataNode.getFeedConnection(ctx.getJobId(), dataverseName, feedName, datasetName); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } } @@ -859,7 +862,7 @@ public class MetadataManager implements IMetadataManager { try { return metadataNode.getFeedConnections(ctx.getJobId(), dataverseName, feedName); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } } @@ -870,7 +873,7 @@ public class MetadataManager implements IMetadataManager { try { dataverseAdapters = metadataNode.getDataverseAdapters(mdTxnCtx.getJobId(), dataverse); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } return dataverseAdapters; } @@ -883,7 +886,7 @@ public class MetadataManager implements IMetadataManager { feedPolicy = metadataNode.getFeedPolicy(mdTxnCtx.getJobId(), dataverseName, policyName); metadataNode.dropFeedPolicy(mdTxnCtx.getJobId(), dataverseName, policyName); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } mdTxnCtx.dropFeedPolicy(feedPolicy); } @@ -894,7 +897,7 @@ public class MetadataManager implements IMetadataManager { try { dataverseFeedPolicies = metadataNode.getDataversePolicies(mdTxnCtx.getJobId(), dataverse); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } return dataverseFeedPolicies; } @@ -906,7 +909,7 @@ public class MetadataManager implements IMetadataManager { try { externalFiles = metadataNode.getExternalFiles(mdTxnCtx.getJobId(), dataset); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } return externalFiles; } @@ -916,7 +919,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.addExternalFile(ctx.getJobId(), externalFile); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } } @@ -926,7 +929,7 @@ public class MetadataManager implements IMetadataManager { metadataNode.dropExternalFile(ctx.getJobId(), externalFile.getDataverseName(), externalFile.getDatasetName(), externalFile.getFileNumber()); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } } @@ -937,7 +940,7 @@ public class MetadataManager implements IMetadataManager { try { file = metadataNode.getExternalFile(ctx.getJobId(), dataverseName, datasetName, fileNumber); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } return file; } @@ -949,7 +952,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.dropExternalFiles(mdTxnCtx.getJobId(), dataset); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } } @@ -958,7 +961,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.updateDataset(ctx.getJobId(), dataset); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } // reflect the dataset into the cache ctx.dropDataset(dataset.getDataverseName(), dataset.getDatasetName()); @@ -984,7 +987,17 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.addEntity(mdTxnCtx.getJobId(), entity); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); + } + } + + @Override + public <T extends IExtensionMetadataEntity> void upsertEntity(MetadataTransactionContext mdTxnCtx, T entity) + throws MetadataException { + try { + metadataNode.upsertEntity(mdTxnCtx.getJobId(), entity); + } catch (RemoteException e) { + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } } @@ -994,7 +1007,7 @@ public class MetadataManager implements IMetadataManager { try { metadataNode.deleteEntity(mdTxnCtx.getJobId(), entity); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } } @@ -1004,7 +1017,7 @@ public class MetadataManager implements IMetadataManager { try { return metadataNode.getEntities(mdTxnCtx.getJobId(), searchKey); } catch (RemoteException e) { - throw new MetadataException(e); + throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } } @@ -1046,9 +1059,9 @@ public class MetadataManager implements IMetadataManager { } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } catch (RemoteException e) { - throw new HyracksDataException(e); + throw new RuntimeDataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } super.init(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java index 84cb671..265e533 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java @@ -34,6 +34,7 @@ import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.IndexType; import org.apache.asterix.common.dataflow.LSMIndexUtil; import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; import org.apache.asterix.common.transactions.AbstractOperationCallback; @@ -97,6 +98,7 @@ import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation; import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallback; +import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallback; import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory; import org.apache.asterix.transaction.management.service.transaction.TransactionContext; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; @@ -124,8 +126,8 @@ import org.apache.hyracks.storage.common.MultiComparator; public class MetadataNode implements IMetadataNode { private static final long serialVersionUID = 1L; private static final Logger LOGGER = Logger.getLogger(MetadataNode.class.getName()); - private static final DatasetId METADATA_DATASET_ID = new ImmutableDatasetId( - MetadataPrimaryIndexes.PROPERTIES_METADATA.getDatasetId()); + private static final DatasetId METADATA_DATASET_ID = + new ImmutableDatasetId(MetadataPrimaryIndexes.PROPERTIES_METADATA.getDatasetId()); // shared between core and extension private IDatasetLifecycleManager datasetLifecycleManager; @@ -173,8 +175,8 @@ public class MetadataNode implements IMetadataNode { @Override public void abortTransaction(JobId jobId) throws RemoteException, ACIDException { try { - ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, - false); + ITransactionContext txnCtx = + transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false); transactionSubsystem.getTransactionManager().abortTransaction(txnCtx, DatasetId.NULL, -1); } catch (ACIDException e) { LOGGER.log(Level.WARNING, "Exception aborting transaction", e); @@ -215,6 +217,25 @@ public class MetadataNode implements IMetadataNode { } /** + * Upsert entity to index + * + * @param jobId + * @param entity + * @param tupleTranslator + * @param index + * @throws MetadataException + */ + private <T> void upsertEntity(JobId jobId, T entity, IMetadataEntityTupleTranslator<T> tupleTranslator, + IMetadataIndex index) throws MetadataException { + try { + ITupleReference tuple = tupleTranslator.getTupleFromMetadataEntity(entity); + upsertTupleIntoIndex(jobId, index, tuple); + } catch (HyracksDataException | ACIDException e) { + throw new MetadataException(e); + } + } + + /** * Delete entity from index * * @param jobId @@ -271,6 +292,18 @@ public class MetadataNode implements IMetadataNode { @SuppressWarnings("unchecked") @Override + public <T extends IExtensionMetadataEntity> void upsertEntity(JobId jobId, T entity) + throws MetadataException, RemoteException { + ExtensionMetadataDataset<T> index = (ExtensionMetadataDataset<T>) extensionDatasets.get(entity.getDatasetId()); + if (index == null) { + throw new MetadataException("Metadata Extension Index: " + entity.getDatasetId() + " was not found"); + } + IMetadataEntityTupleTranslator<T> tupleTranslator = index.getTupleTranslator(); + upsertEntity(jobId, entity, tupleTranslator, index); + } + + @SuppressWarnings("unchecked") + @Override public <T extends IExtensionMetadataEntity> void deleteEntity(JobId jobId, T entity) throws MetadataException, RemoteException { ExtensionMetadataDataset<T> index = (ExtensionMetadataDataset<T>) extensionDatasets.get(entity.getDatasetId()); @@ -285,8 +318,8 @@ public class MetadataNode implements IMetadataNode { @Override public <T extends IExtensionMetadataEntity> List<T> getEntities(JobId jobId, IExtensionMetadataSearchKey searchKey) throws MetadataException, RemoteException { - ExtensionMetadataDataset<T> index = (ExtensionMetadataDataset<T>) extensionDatasets - .get(searchKey.getDatasetId()); + ExtensionMetadataDataset<T> index = + (ExtensionMetadataDataset<T>) extensionDatasets.get(searchKey.getDatasetId()); if (index == null) { throw new MetadataException("Metadata Extension Index: " + searchKey.getDatasetId() + " was not found"); } @@ -396,8 +429,8 @@ public class MetadataNode implements IMetadataNode { @Override public void addDatatype(JobId jobId, Datatype datatype) throws MetadataException, RemoteException { try { - DatatypeTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getDataTypeTupleTranslator(jobId, this, - true); + DatatypeTupleTranslator tupleReaderWriter = + tupleTranslatorProvider.getDataTypeTupleTranslator(jobId, this, true); ITupleReference tuple = tupleReaderWriter.getTupleFromMetadataEntity(datatype); insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.DATATYPE_DATASET, tuple); } catch (HyracksDataException e) { @@ -443,13 +476,13 @@ public class MetadataNode implements IMetadataNode { datasetLifecycleManager.open(resourceName); // prepare a Callback for logging - IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID, - metadataIndex, lsmIndex, Operation.INSERT); + IModificationOperationCallback modCallback = + createIndexModificationCallback(jobId, resourceID, metadataIndex, lsmIndex, Operation.INSERT); ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE); - ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, - false); + ITransactionContext txnCtx = + transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false); txnCtx.setWriteTxn(true); txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback, metadataIndex.isPrimaryIndex()); @@ -471,6 +504,39 @@ public class MetadataNode implements IMetadataNode { } } + private void upsertTupleIntoIndex(JobId jobId, IMetadataIndex metadataIndex, ITupleReference tuple) + throws ACIDException, HyracksDataException { + long resourceId = metadataIndex.getResourceId(); + String resourceName = metadataIndex.getFile().getRelativePath(); + ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.get(resourceName); + datasetLifecycleManager.open(resourceName); + try { + // prepare a Callback for logging + ITransactionContext txnCtx = + transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false); + IModificationOperationCallback modCallback = + new UpsertOperationCallback(metadataIndex.getDatasetId(), metadataIndex.getPrimaryKeyIndexes(), + txnCtx, transactionSubsystem.getLockManager(), transactionSubsystem, resourceId, + metadataStoragePartition, ResourceType.LSM_BTREE, Operation.UPSERT); + ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE); + txnCtx.setWriteTxn(true); + txnCtx.registerIndexAndCallback(resourceId, lsmIndex, (AbstractOperationCallback) modCallback, + metadataIndex.isPrimaryIndex()); + LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) lsmIndex, transactionSubsystem.getLogManager()); + indexAccessor.forceUpsert(tuple); + //Manually complete the operation after the insert. This is to decrement the resource counters within the + //index that determine how many tuples are still 'in-flight' within the index. Normally the log flusher + //does this. The only exception is the index registered as the "primary" which we will let be decremented + //by the job commit log event + if (!((TransactionContext) txnCtx).getPrimaryIndexOpTracker().equals(lsmIndex.getOperationTracker())) { + lsmIndex.getOperationTracker().completeOperation(lsmIndex, LSMOperationType.FORCE_MODIFICATION, null, + modCallback); + } + } finally { + datasetLifecycleManager.close(resourceName); + } + } + private IModificationOperationCallback createIndexModificationCallback(JobId jobId, long resourceId, IMetadataIndex metadataIndex, ILSMIndex lsmIndex, Operation indexOp) throws ACIDException { ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false); @@ -748,12 +814,12 @@ public class MetadataNode implements IMetadataNode { try { datasetLifecycleManager.open(resourceName); // prepare a Callback for logging - IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID, - metadataIndex, lsmIndex, Operation.DELETE); + IModificationOperationCallback modCallback = + createIndexModificationCallback(jobId, resourceID, metadataIndex, lsmIndex, Operation.DELETE); ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE); - ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, - false); + ITransactionContext txnCtx = + transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false); txnCtx.setWriteTxn(true); txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback, metadataIndex.isPrimaryIndex()); @@ -852,8 +918,8 @@ public class MetadataNode implements IMetadataNode { throws MetadataException, RemoteException { try { ITupleReference searchKey = createTuple(dataverseName); - DatatypeTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getDataTypeTupleTranslator(jobId, this, - false); + DatatypeTupleTranslator tupleReaderWriter = + tupleTranslatorProvider.getDataTypeTupleTranslator(jobId, this, false); IValueExtractor<Datatype> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter); List<Datatype> results = new ArrayList<>(); searchIndex(jobId, MetadataPrimaryIndexes.DATATYPE_DATASET, searchKey, valueExtractor, results); @@ -897,8 +963,8 @@ public class MetadataNode implements IMetadataNode { public List<Datatype> getAllDatatypes(JobId jobId) throws MetadataException, RemoteException { try { ITupleReference searchKey = null; - DatatypeTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getDataTypeTupleTranslator(jobId, this, - false); + DatatypeTupleTranslator tupleReaderWriter = + tupleTranslatorProvider.getDataTypeTupleTranslator(jobId, this, false); IValueExtractor<Datatype> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter); List<Datatype> results = new ArrayList<>(); searchIndex(jobId, MetadataPrimaryIndexes.DATATYPE_DATASET, searchKey, valueExtractor, results); @@ -1010,8 +1076,8 @@ public class MetadataNode implements IMetadataNode { throws MetadataException, RemoteException { try { ITupleReference searchKey = createTuple(dataverseName, datasetName, indexName); - IndexTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getIndexTupleTranslator(jobId, this, - false); + IndexTupleTranslator tupleReaderWriter = + tupleTranslatorProvider.getIndexTupleTranslator(jobId, this, false); IValueExtractor<Index> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter); List<Index> results = new ArrayList<>(); searchIndex(jobId, MetadataPrimaryIndexes.INDEX_DATASET, searchKey, valueExtractor, results); @@ -1029,8 +1095,8 @@ public class MetadataNode implements IMetadataNode { throws MetadataException, RemoteException { try { ITupleReference searchKey = createTuple(dataverseName, datasetName); - IndexTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getIndexTupleTranslator(jobId, this, - false); + IndexTupleTranslator tupleReaderWriter = + tupleTranslatorProvider.getIndexTupleTranslator(jobId, this, false); IValueExtractor<Index> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter); List<Index> results = new ArrayList<>(); searchIndex(jobId, MetadataPrimaryIndexes.INDEX_DATASET, searchKey, valueExtractor, results); @@ -1045,8 +1111,8 @@ public class MetadataNode implements IMetadataNode { throws MetadataException, RemoteException { try { ITupleReference searchKey = createTuple(dataverseName, datatypeName); - DatatypeTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getDataTypeTupleTranslator(jobId, this, - false); + DatatypeTupleTranslator tupleReaderWriter = + tupleTranslatorProvider.getDataTypeTupleTranslator(jobId, this, false); IValueExtractor<Datatype> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter); List<Datatype> results = new ArrayList<>(); searchIndex(jobId, MetadataPrimaryIndexes.DATATYPE_DATASET, searchKey, valueExtractor, results); @@ -1111,8 +1177,8 @@ public class MetadataNode implements IMetadataNode { "" + functionSignature.getArity()); // Searches the index for the tuple to be deleted. Acquires an S // lock on the 'function' dataset. - ITupleReference functionTuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.FUNCTION_DATASET, - searchKey); + ITupleReference functionTuple = + getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.FUNCTION_DATASET, searchKey); deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.FUNCTION_DATASET, functionTuple); // TODO: Change this to be a BTree specific exception, e.g., @@ -1153,8 +1219,8 @@ public class MetadataNode implements IMetadataNode { String resourceName = index.getFile().toString(); IIndex indexInstance = datasetLifecycleManager.get(resourceName); datasetLifecycleManager.open(resourceName); - IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, - NoOpOperationCallback.INSTANCE); + IIndexAccessor indexAccessor = + indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false); RangePredicate rangePred = null; @@ -1174,8 +1240,8 @@ public class MetadataNode implements IMetadataNode { index = MetadataPrimaryIndexes.DATASET_DATASET; indexInstance = datasetLifecycleManager.get(resourceName); datasetLifecycleManager.open(resourceName); - indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, - NoOpOperationCallback.INSTANCE); + indexAccessor = + indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false); rangePred = null; @@ -1196,8 +1262,8 @@ public class MetadataNode implements IMetadataNode { index = MetadataPrimaryIndexes.INDEX_DATASET; indexInstance = datasetLifecycleManager.get(resourceName); datasetLifecycleManager.open(resourceName); - indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, - NoOpOperationCallback.INSTANCE); + indexAccessor = + indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false); rangePred = null; @@ -1232,8 +1298,8 @@ public class MetadataNode implements IMetadataNode { String resourceName = index.getFile().getRelativePath(); IIndex indexInstance = datasetLifecycleManager.get(resourceName); datasetLifecycleManager.open(resourceName); - IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, - NoOpOperationCallback.INSTANCE); + IIndexAccessor indexAccessor = + indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false); IBinaryComparator[] searchCmps = null; @@ -1271,8 +1337,8 @@ public class MetadataNode implements IMetadataNode { IIndex indexInstance = datasetLifecycleManager.get(resourceName); datasetLifecycleManager.open(resourceName); try { - IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, - NoOpOperationCallback.INSTANCE); + IIndexAccessor indexAccessor = + indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); IIndexCursor rangeCursor = indexAccessor.createSearchCursor(false); DatasetTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getDatasetTupleTranslator(false); @@ -1310,8 +1376,8 @@ public class MetadataNode implements IMetadataNode { // Hyracks version. public static ITupleReference createTuple(String... fields) { @SuppressWarnings("unchecked") - ISerializerDeserializer<AString> stringSerde = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(BuiltinType.ASTRING); + ISerializerDeserializer<AString> stringSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING); AMutableString aString = new AMutableString(""); ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fields.length); for (String s : fields) { @@ -1348,8 +1414,8 @@ public class MetadataNode implements IMetadataNode { public void addAdapter(JobId jobId, DatasourceAdapter adapter) throws MetadataException, RemoteException { try { // Insert into the 'Adapter' dataset. - DatasourceAdapterTupleTranslator tupleReaderWriter = tupleTranslatorProvider - .getAdapterTupleTranslator(true); + DatasourceAdapterTupleTranslator tupleReaderWriter = + tupleTranslatorProvider.getAdapterTupleTranslator(true); ITupleReference adapterTuple = tupleReaderWriter.getTupleFromMetadataEntity(adapter); insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, adapterTuple); } catch (HyracksDataException e) { @@ -1376,8 +1442,8 @@ public class MetadataNode implements IMetadataNode { ITupleReference searchKey = createTuple(dataverseName, adapterName); // Searches the index for the tuple to be deleted. Acquires an S // lock on the 'Adapter' dataset. - ITupleReference datasetTuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, - searchKey); + ITupleReference datasetTuple = + getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, searchKey); deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, datasetTuple); // TODO: Change this to be a BTree specific exception, e.g., @@ -1400,8 +1466,8 @@ public class MetadataNode implements IMetadataNode { throws MetadataException, RemoteException { try { ITupleReference searchKey = createTuple(dataverseName, adapterName); - DatasourceAdapterTupleTranslator tupleReaderWriter = tupleTranslatorProvider - .getAdapterTupleTranslator(false); + DatasourceAdapterTupleTranslator tupleReaderWriter = + tupleTranslatorProvider.getAdapterTupleTranslator(false); List<DatasourceAdapter> results = new ArrayList<>(); IValueExtractor<DatasourceAdapter> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter); searchIndex(jobId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, searchKey, valueExtractor, results); @@ -1419,8 +1485,8 @@ public class MetadataNode implements IMetadataNode { throws MetadataException, RemoteException { try { // Insert into the 'CompactionPolicy' dataset. - CompactionPolicyTupleTranslator tupleReaderWriter = tupleTranslatorProvider - .getCompactionPolicyTupleTranslator(true); + CompactionPolicyTupleTranslator tupleReaderWriter = + tupleTranslatorProvider.getCompactionPolicyTupleTranslator(true); ITupleReference compactionPolicyTuple = tupleReaderWriter.getTupleFromMetadataEntity(compactionPolicy); insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.COMPACTION_POLICY_DATASET, compactionPolicyTuple); } catch (HyracksDataException e) { @@ -1440,8 +1506,8 @@ public class MetadataNode implements IMetadataNode { throws MetadataException, RemoteException { try { ITupleReference searchKey = createTuple(dataverse, policyName); - CompactionPolicyTupleTranslator tupleReaderWriter = tupleTranslatorProvider - .getCompactionPolicyTupleTranslator(false); + CompactionPolicyTupleTranslator tupleReaderWriter = + tupleTranslatorProvider.getCompactionPolicyTupleTranslator(false); List<CompactionPolicy> results = new ArrayList<>(); IValueExtractor<CompactionPolicy> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter); searchIndex(jobId, MetadataPrimaryIndexes.COMPACTION_POLICY_DATASET, searchKey, valueExtractor, results); @@ -1459,8 +1525,8 @@ public class MetadataNode implements IMetadataNode { throws MetadataException, RemoteException { try { ITupleReference searchKey = createTuple(dataverseName); - DatasourceAdapterTupleTranslator tupleReaderWriter = tupleTranslatorProvider - .getAdapterTupleTranslator(false); + DatasourceAdapterTupleTranslator tupleReaderWriter = + tupleTranslatorProvider.getAdapterTupleTranslator(false); IValueExtractor<DatasourceAdapter> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter); List<DatasourceAdapter> results = new ArrayList<>(); searchIndex(jobId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, searchKey, valueExtractor, results); @@ -1502,8 +1568,8 @@ public class MetadataNode implements IMetadataNode { ITupleReference searchKey = createTuple(dataverseName, libraryName); // Searches the index for the tuple to be deleted. Acquires an S // lock on the 'Adapter' dataset. - ITupleReference datasetTuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.LIBRARY_DATASET, - searchKey); + ITupleReference datasetTuple = + getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.LIBRARY_DATASET, searchKey); deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.LIBRARY_DATASET, datasetTuple); // TODO: Change this to be a BTree specific exception, e.g., @@ -1630,8 +1696,8 @@ public class MetadataNode implements IMetadataNode { throws MetadataException, RemoteException { try { ITupleReference searchKey = createTuple(dataverseName, feedName, datasetName); - ITupleReference tuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET, - searchKey); + ITupleReference tuple = + getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET, searchKey); deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET, tuple); } catch (HyracksDataException | ACIDException e) { throw new MetadataException(e); @@ -1734,8 +1800,8 @@ public class MetadataNode implements IMetadataNode { public void addExternalFile(JobId jobId, ExternalFile externalFile) throws MetadataException, RemoteException { try { // Insert into the 'externalFiles' dataset. - ExternalFileTupleTranslator tupleReaderWriter = tupleTranslatorProvider - .getExternalFileTupleTranslator(true); + ExternalFileTupleTranslator tupleReaderWriter = + tupleTranslatorProvider.getExternalFileTupleTranslator(true); ITupleReference externalFileTuple = tupleReaderWriter.getTupleFromMetadataEntity(externalFile); insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, externalFileTuple); } catch (HyracksDataException e) { @@ -1755,8 +1821,8 @@ public class MetadataNode implements IMetadataNode { public List<ExternalFile> getExternalFiles(JobId jobId, Dataset dataset) throws MetadataException, RemoteException { try { ITupleReference searchKey = createTuple(dataset.getDataverseName(), dataset.getDatasetName()); - ExternalFileTupleTranslator tupleReaderWriter = tupleTranslatorProvider - .getExternalFileTupleTranslator(false); + ExternalFileTupleTranslator tupleReaderWriter = + tupleTranslatorProvider.getExternalFileTupleTranslator(false); IValueExtractor<ExternalFile> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter); List<ExternalFile> results = new ArrayList<>(); searchIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, searchKey, valueExtractor, results); @@ -1774,8 +1840,8 @@ public class MetadataNode implements IMetadataNode { ITupleReference searchKey = createExternalFileSearchTuple(dataverseName, datasetName, fileNumber); // Searches the index for the tuple to be deleted. Acquires an S // lock on the 'ExternalFile' dataset. - ITupleReference datasetTuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, - searchKey); + ITupleReference datasetTuple = + getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, searchKey); deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, datasetTuple); } catch (HyracksDataException e) { if (e.getComponent().equals(ErrorCode.HYRACKS) @@ -1803,10 +1869,10 @@ public class MetadataNode implements IMetadataNode { @SuppressWarnings("unchecked") public ITupleReference createExternalFileSearchTuple(String dataverseName, String datasetName, int fileNumber) throws HyracksDataException { - ISerializerDeserializer<AString> stringSerde = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(BuiltinType.ASTRING); - ISerializerDeserializer<AInt32> intSerde = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(BuiltinType.AINT32); + ISerializerDeserializer<AString> stringSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING); + ISerializerDeserializer<AInt32> intSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32); AMutableString aString = new AMutableString(""); ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(3); @@ -1835,8 +1901,8 @@ public class MetadataNode implements IMetadataNode { throws MetadataException, RemoteException { try { ITupleReference searchKey = createExternalFileSearchTuple(dataverseName, datasetName, fileNumber); - ExternalFileTupleTranslator tupleReaderWriter = tupleTranslatorProvider - .getExternalFileTupleTranslator(false); + ExternalFileTupleTranslator tupleReaderWriter = + tupleTranslatorProvider.getExternalFileTupleTranslator(false); IValueExtractor<ExternalFile> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter); List<ExternalFile> results = new ArrayList<>(); searchIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, searchKey, valueExtractor, results); @@ -1858,8 +1924,8 @@ public class MetadataNode implements IMetadataNode { searchKey = createTuple(dataset.getDataverseName(), dataset.getDatasetName()); // Searches the index for the tuple to be deleted. Acquires an S // lock on the 'dataset' dataset. - ITupleReference datasetTuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.DATASET_DATASET, - searchKey); + ITupleReference datasetTuple = + getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.DATASET_DATASET, searchKey); deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.DATASET_DATASET, datasetTuple); // Previous tuple was deleted // Insert into the 'dataset' dataset. @@ -1876,10 +1942,10 @@ public class MetadataNode implements IMetadataNode { try { // remove old function ITupleReference searchKey; - searchKey = createTuple(function.getDataverseName(), function.getName(), - Integer.toString(function.getArity())); - ITupleReference functionTuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.FUNCTION_DATASET, - searchKey); + searchKey = + createTuple(function.getDataverseName(), function.getName(), Integer.toString(function.getArity())); + ITupleReference functionTuple = + getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.FUNCTION_DATASET, searchKey); deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.FUNCTION_DATASET, functionTuple); // add new function FunctionTupleTranslator functionTupleTranslator = tupleTranslatorProvider.getFunctionTupleTranslator(true);
