http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java new file mode 100644 index 0000000..126ec19 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Keep track of the runnable procedures + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface ProcedureRunnableSet { + /** + * Inserts the specified element at the front of this queue. + * @param proc the Procedure to add + */ + void addFront(Procedure proc); + + /** + * Inserts the specified element at the end of this queue. + * @param proc the Procedure to add + */ + void addBack(Procedure proc); + + /** + * The procedure can't run at the moment. + * add it back to the queue, giving priority to someone else. + * @param proc the Procedure to add back to the list + */ + void yield(Procedure proc); + + /** + * Fetch one Procedure from the queue + * @return the Procedure ID to execute, or null if nothing present. + */ + Long poll(); + + /** + * In case the class is blocking on poll() waiting for items to be added, + * this method should awake poll() and poll() should return. + */ + void signalAll(); + + /** + * Returns the number of elements in this collection. + * @return the number of elements in this collection. + */ + int size(); + + /** + * Removes all of the elements from this collection. + */ + void clear(); +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java new file mode 100644 index 0000000..0a00ac5 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Simple runqueue for the procedures + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ProcedureSimpleRunQueue implements ProcedureRunnableSet { + private final Deque<Long> runnables = new ArrayDeque<Long>(); + private final ReentrantLock lock = new ReentrantLock(); + private final Condition waitCond = lock.newCondition(); + + @Override + public void addFront(final Procedure proc) { + lock.lock(); + try { + runnables.addFirst(proc.getProcId()); + waitCond.signal(); + } finally { + lock.unlock(); + } + } + + @Override + public void addBack(final Procedure proc) { + lock.lock(); + try { + runnables.addLast(proc.getProcId()); + waitCond.signal(); + } finally { + lock.unlock(); + } + } + + @Override + public void yield(final Procedure proc) { + addBack(proc); + } + + @Override + @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") + public Long poll() { + lock.lock(); + try { + if (runnables.isEmpty()) { + waitCond.await(); + if (!runnables.isEmpty()) { + return runnables.pop(); + } + } else { + return runnables.pop(); + } + } catch (InterruptedException e) { + return null; + } finally { + lock.unlock(); + } + return null; + } + + @Override + public void signalAll() { + lock.lock(); + try { + waitCond.signalAll(); + } finally { + lock.unlock(); + } + } + + @Override + public void clear() { + lock.lock(); + try { + runnables.clear(); + } finally { + lock.unlock(); + } + } + + @Override + public int size() { + lock.lock(); + try { + return runnables.size(); + } finally { + lock.unlock(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureYieldException.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureYieldException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureYieldException.java new file mode 100644 index 0000000..177ff5b --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureYieldException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +// TODO: Not used yet +@InterfaceAudience.Public +@InterfaceStability.Stable +public class ProcedureYieldException extends ProcedureException { + /** default constructor */ + public ProcedureYieldException() { + super(); + } + + /** + * Constructor + * @param s message + */ + public ProcedureYieldException(String s) { + super(s); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java new file mode 100644 index 0000000..6be512d --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; + +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; + +import com.google.protobuf.InvalidProtocolBufferException; + +/** + * A RemoteProcedureException is an exception from another thread or process. + * <p> + * RemoteProcedureExceptions are sent to 'remote' peers to signal an abort in the face of failures. + * When serialized for transmission we encode using Protobufs to ensure version compatibility. + * <p> + * RemoteProcedureException exceptions contain a Throwable as its cause. + * This can be a "regular" exception generated locally or a ProxyThrowable that is a representation + * of the original exception created on original 'remote' source. These ProxyThrowables have their + * their stacks traces and messages overridden to reflect the original 'remote' exception. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +@SuppressWarnings("serial") +public class RemoteProcedureException extends ProcedureException { + + /** + * Name of the throwable's source such as a host or thread name. Must be non-null. + */ + private final String source; + + /** + * Create a new RemoteProcedureException that can be serialized. + * It is assumed that this came form a local source. + * @param source + * @param cause + */ + public RemoteProcedureException(String source, Throwable cause) { + super(cause); + assert source != null; + assert cause != null; + this.source = source; + } + + public String getSource() { + return source; + } + + public IOException unwrapRemoteException() { + if (getCause() instanceof RemoteException) { + return ((RemoteException)getCause()).unwrapRemoteException(); + } + if (getCause() instanceof IOException) { + return (IOException)getCause(); + } + return new IOException(getCause()); + } + + @Override + public String toString() { + String className = getCause().getClass().getName(); + return className + " via " + getSource() + ":" + getLocalizedMessage(); + } + + /** + * Converts a RemoteProcedureException to an array of bytes. + * @param source the name of the external exception source + * @param t the "local" external exception (local) + * @return protobuf serialized version of RemoteProcedureException + */ + public static byte[] serialize(String source, Throwable t) { + return toProto(source, t).toByteArray(); + } + + /** + * Takes a series of bytes and tries to generate an RemoteProcedureException instance for it. + * @param bytes + * @return the ForeignExcpetion instance + * @throws InvalidProtocolBufferException if there was deserialization problem this is thrown. + */ + public static RemoteProcedureException deserialize(byte[] bytes) + throws InvalidProtocolBufferException { + return fromProto(ForeignExceptionMessage.parseFrom(bytes)); + } + + public ForeignExceptionMessage convert() { + return ForeignExceptionUtil.toProtoForeignException(getSource(), getCause()); + } + + public static ForeignExceptionMessage toProto(String source, Throwable t) { + return ForeignExceptionUtil.toProtoForeignException(source, t); + } + + public static RemoteProcedureException fromProto(final ForeignExceptionMessage eem) { + return new RemoteProcedureException(eem.getSource(), ForeignExceptionUtil.toIOException(eem)); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java new file mode 100644 index 0000000..2f9c007 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState; + +/** + * Internal state of the ProcedureExecutor that describes the state of a "Root Procedure". + * A "Root Procedure" is a Procedure without parent, each subprocedure will be + * added to the "Root Procedure" stack (or rollback-stack). + * + * RootProcedureState is used and managed only by the ProcedureExecutor. + * Long rootProcId = getRootProcedureId(proc); + * rollbackStack.get(rootProcId).acquire(proc) + * rollbackStack.get(rootProcId).release(proc) + * ... + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +class RootProcedureState { + private static final Log LOG = LogFactory.getLog(RootProcedureState.class); + + private enum State { + RUNNING, // The Procedure is running or ready to run + FAILED, // The Procedure failed, waiting for the rollback executing + ROLLINGBACK, // The Procedure failed and the execution was rolledback + } + + private ArrayList<Procedure> subprocedures = null; + private State state = State.RUNNING; + private int running = 0; + + public synchronized boolean isFailed() { + switch (state) { + case ROLLINGBACK: + case FAILED: + return true; + default: + break; + } + return false; + } + + public synchronized boolean isRollingback() { + return state == State.ROLLINGBACK; + } + + /** + * Called by the ProcedureExecutor to mark rollback execution + */ + protected synchronized boolean setRollback() { + if (running == 0 && state == State.FAILED) { + state = State.ROLLINGBACK; + return true; + } + return false; + } + + protected synchronized List<Procedure> getSubprocedures() { + return subprocedures; + } + + protected synchronized RemoteProcedureException getException() { + if (subprocedures != null) { + for (Procedure proc: subprocedures) { + if (proc.hasException()) { + return proc.getException(); + } + } + } + return null; + } + + /** + * Called by the ProcedureExecutor to mark the procedure step as running. + */ + protected synchronized boolean acquire(final Procedure proc) { + if (state != State.RUNNING) return false; + + running++; + return true; + } + + /** + * Called by the ProcedureExecutor to mark the procedure step as finished. + */ + protected synchronized void release(final Procedure proc) { + running--; + } + + protected synchronized void abort() { + if (state == State.RUNNING) { + state = State.FAILED; + } + } + + /** + * Called by the ProcedureExecutor after the procedure step is completed, + * to add the step to the rollback list (or procedure stack) + */ + protected synchronized void addRollbackStep(final Procedure proc) { + if (proc.isFailed()) { + state = State.FAILED; + } + if (subprocedures == null) { + subprocedures = new ArrayList<Procedure>(); + } + proc.addStackIndex(subprocedures.size()); + subprocedures.add(proc); + } + + /** + * Called on store load by the ProcedureExecutor to load part of the stack. + * + * Each procedure has its own stack-positions. Which means we have to write + * to the store only the Procedure we executed, and nothing else. + * on load we recreate the full stack by aggregating each procedure stack-positions. + */ + protected synchronized void loadStack(final Procedure proc) { + int[] stackIndexes = proc.getStackIndexes(); + if (stackIndexes != null) { + if (subprocedures == null) { + subprocedures = new ArrayList<Procedure>(); + } + int diff = (1 + stackIndexes[stackIndexes.length - 1]) - subprocedures.size(); + if (diff > 0) { + subprocedures.ensureCapacity(1 + stackIndexes[stackIndexes.length - 1]); + while (diff-- > 0) subprocedures.add(null); + } + for (int i = 0; i < stackIndexes.length; ++i) { + subprocedures.set(stackIndexes[i], proc); + } + } + if (proc.getState() == ProcedureState.ROLLEDBACK) { + state = State.ROLLINGBACK; + } else if (proc.isFailed()) { + state = State.FAILED; + } + } + + /** + * Called on store load by the ProcedureExecutor to validate the procedure stack. + */ + protected synchronized boolean isValid() { + if (subprocedures != null) { + for (Procedure proc: subprocedures) { + if (proc == null) { + return false; + } + } + } + return true; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java new file mode 100644 index 0000000..7bf959a --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.SequentialProcedureData; + +/** + * A SequentialProcedure describes one step in a procedure chain. + * -> Step 1 -> Step 2 -> Step 3 + * + * The main difference from a base Procedure is that the execute() of a + * SequentialProcedure will be called only once, there will be no second + * execute() call once the child are finished. which means once the child + * of a SequentialProcedure are completed the SequentialProcedure is completed too. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public abstract class SequentialProcedure<TEnvironment> extends Procedure<TEnvironment> { + private boolean executed = false; + + @Override + protected Procedure[] doExecute(final TEnvironment env) + throws ProcedureYieldException { + updateTimestamp(); + try { + executed = !executed; + return executed ? execute(env) : null; + } finally { + updateTimestamp(); + } + } + + @Override + protected void doRollback(final TEnvironment env) { + updateTimestamp(); + executed = !executed; + if (!executed) { + try { + rollback(env); + } finally { + updateTimestamp(); + } + } + } + + @Override + protected void serializeStateData(final OutputStream stream) throws IOException { + SequentialProcedureData.Builder data = SequentialProcedureData.newBuilder(); + data.setExecuted(executed); + data.build().writeDelimitedTo(stream); + } + + @Override + protected void deserializeStateData(final InputStream stream) throws IOException { + SequentialProcedureData data = SequentialProcedureData.parseDelimitedFrom(stream); + executed = data.getExecuted(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java new file mode 100644 index 0000000..f0a3765 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.StateMachineProcedureData; + +/** + * Procedure described by a series of steps. + * + * The procedure implementor must have an enum of 'states', describing + * the various step of the procedure. + * Once the procedure is running, the procedure-framework will call executeFromState() + * using the 'state' provided by the user. The first call to executeFromState() + * will be performed with 'state = null'. The implementor can jump between + * states using setNextState(MyStateEnum.ordinal()). + * The rollback will call rollbackState() for each state that was executed, in reverse order. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public abstract class StateMachineProcedure<TEnvironment, TState> + extends Procedure<TEnvironment> { + private int stateCount = 0; + private int[] states = null; + + protected enum Flow { + HAS_MORE_STATE, + NO_MORE_STATE, + } + + /** + * called to perform a single step of the specified 'state' of the procedure + * @param state state to execute + * @return Flow.NO_MORE_STATE if the procedure is completed, + * Flow.HAS_MORE_STATE if there is another step. + */ + protected abstract Flow executeFromState(TEnvironment env, TState state) + throws ProcedureYieldException; + + /** + * called to perform the rollback of the specified state + * @param state state to rollback + */ + protected abstract void rollbackState(TEnvironment env, TState state); + + /** + * Convert an ordinal (or state id) to an Enum (or more descriptive) state object. + * @param stateId the ordinal() of the state enum (or state id) + * @return the state enum object + */ + protected abstract TState getState(int stateId); + + @Override + protected Procedure[] execute(final TEnvironment env) + throws ProcedureYieldException { + updateTimestamp(); + try { + TState state = stateCount > 0 ? getState(states[stateCount-1]) : null; + if (executeFromState(env, state) == Flow.NO_MORE_STATE) { + // completed + return null; + } + return (isWaiting() || isFailed()) ? null : new Procedure[] {this}; + } finally { + updateTimestamp(); + } + } + + @Override + protected void rollback(final TEnvironment env) { + try { + assert stateCount > 0 : "called rollback with no steps executed proc=" + this; + updateTimestamp(); + rollbackState(env, getState(states[--stateCount])); + } finally { + updateTimestamp(); + } + } + + /** + * Set the next state for the procedure. + * @param stateId the ordinal() of the state enum (or state id) + */ + protected void setNextState(final int stateId) { + if (states == null || states.length == stateCount) { + int newCapacity = stateCount + 8; + if (states != null) { + states = Arrays.copyOf(states, newCapacity); + } else { + states = new int[newCapacity]; + } + } + states[stateCount++] = stateId; + } + + @Override + protected void serializeStateData(final OutputStream stream) throws IOException { + StateMachineProcedureData.Builder data = StateMachineProcedureData.newBuilder(); + for (int i = 0; i < stateCount; ++i) { + data.addState(states[i]); + } + data.build().writeDelimitedTo(stream); + } + + @Override + protected void deserializeStateData(final InputStream stream) throws IOException { + StateMachineProcedureData data = StateMachineProcedureData.parseDelimitedFrom(stream); + stateCount = data.getStateCount(); + if (stateCount > 0) { + states = new int[stateCount]; + for (int i = 0; i < stateCount; ++i) { + states[i] = data.getState(i); + } + } else { + states = null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TwoPhaseProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TwoPhaseProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TwoPhaseProcedure.java new file mode 100644 index 0000000..cd6b0a7 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TwoPhaseProcedure.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public abstract class TwoPhaseProcedure<TEnvironment> extends Procedure<TEnvironment> { + // TODO (e.g. used by ACLs/VisibilityTags updates) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java new file mode 100644 index 0000000..8677d0f --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2.store; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.procedure2.Procedure; + +/** + * The ProcedureStore is used by the executor to persist the state of each procedure execution. + * This allows to resume the execution of pending/in-progress procedures in case + * of machine failure or service shutdown. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface ProcedureStore { + /** + * Store listener interface. + * The main process should register a listener and respond to the store events. + */ + public interface ProcedureStoreListener { + /** + * triggered when the store is not able to write out data. + * the main process should abort. + */ + void abortProcess(); + } + + /** + * Add the listener to the notification list. + * @param listener The AssignmentListener to register + */ + void registerListener(ProcedureStoreListener listener); + + /** + * Remove the listener from the notification list. + * @param listener The AssignmentListener to unregister + * @return true if the listner was in the list and it was removed, otherwise false. + */ + boolean unregisterListener(ProcedureStoreListener listener); + + /** + * Start/Open the procedure store + * @param numThreads + */ + void start(int numThreads) throws IOException; + + /** + * Stop/Close the procedure store + * @param abort true if the stop is an abort + */ + void stop(boolean abort); + + /** + * @return true if the store is running, otherwise false. + */ + boolean isRunning(); + + /** + * Acquire the lease for the procedure store. + */ + void recoverLease() throws IOException; + + /** + * Load the Procedures in the store. + * @return the set of procedures present in the store + */ + Iterator<Procedure> load() throws IOException; + + /** + * When a procedure is submitted to the executor insert(proc, null) will be called. + * 'proc' has a 'RUNNABLE' state and the initial information required to start up. + * + * When a procedure is executed and it returns children insert(proc, subprocs) will be called. + * 'proc' has a 'WAITING' state and an update state. + * 'subprocs' are the children in 'RUNNABLE' state with the initial information. + * + * @param proc the procedure to serialize and write to the store. + * @param subprocs the newly created child of the proc. + */ + void insert(Procedure proc, Procedure[] subprocs); + + /** + * The specified procedure was executed, + * and the new state should be written to the store. + * @param proc the procedure to serialize and write to the store. + */ + void update(Procedure proc); + + /** + * The specified procId was removed from the executor, + * due to completion, abort or failure. + * The store implementor should remove all the information about the specified procId. + * @param procId the ID of the procedure to remove. + */ + void delete(long procId); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java new file mode 100644 index 0000000..4e4653a --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -0,0 +1,540 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2.store; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos; + +/** + * Keeps track of live procedures. + * + * It can be used by the ProcedureStore to identify which procedures are already + * deleted/completed to avoid the deserialization step on restart. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class ProcedureStoreTracker { + private final TreeMap<Long, BitSetNode> map = new TreeMap<Long, BitSetNode>(); + + private boolean keepDeletes = false; + private boolean partial = false; + + public enum DeleteState { YES, NO, MAYBE } + + public static class BitSetNode { + private final static long WORD_MASK = 0xffffffffffffffffL; + private final static int ADDRESS_BITS_PER_WORD = 6; + private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD; + private final static int MAX_NODE_SIZE = 4 << ADDRESS_BITS_PER_WORD; + + private long[] updated; + private long[] deleted; + private long start; + + public void dump() { + System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(), + getMinProcId(), getMaxProcId()); + System.out.println("Update:"); + for (int i = 0; i < updated.length; ++i) { + for (int j = 0; j < BITS_PER_WORD; ++j) { + System.out.print((updated[i] & (1L << j)) != 0 ? "1" : "0"); + } + System.out.println(" " + i); + } + System.out.println(); + System.out.println("Delete:"); + for (int i = 0; i < deleted.length; ++i) { + for (int j = 0; j < BITS_PER_WORD; ++j) { + System.out.print((deleted[i] & (1L << j)) != 0 ? "1" : "0"); + } + System.out.println(" " + i); + } + System.out.println(); + } + + public BitSetNode(final long procId, final boolean partial) { + start = alignDown(procId); + + int count = 2; + updated = new long[count]; + deleted = new long[count]; + for (int i = 0; i < count; ++i) { + updated[i] = 0; + deleted[i] = partial ? 0 : WORD_MASK; + } + + updateState(procId, false); + } + + protected BitSetNode(final long start, final long[] updated, final long[] deleted) { + this.start = start; + this.updated = updated; + this.deleted = deleted; + } + + public void update(final long procId) { + updateState(procId, false); + } + + public void delete(final long procId) { + updateState(procId, true); + } + + public Long getStart() { + return start; + } + + public Long getEnd() { + return start + (updated.length << ADDRESS_BITS_PER_WORD) - 1; + } + + public boolean contains(final long procId) { + return start <= procId && procId <= getEnd(); + } + + public DeleteState isDeleted(final long procId) { + int bitmapIndex = getBitmapIndex(procId); + int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; + if (wordIndex >= deleted.length) { + return DeleteState.MAYBE; + } + return (deleted[wordIndex] & (1L << bitmapIndex)) != 0 ? DeleteState.YES : DeleteState.NO; + } + + private boolean isUpdated(final long procId) { + int bitmapIndex = getBitmapIndex(procId); + int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; + if (wordIndex >= updated.length) { + return false; + } + return (updated[wordIndex] & (1L << bitmapIndex)) != 0; + } + + public boolean isUpdated() { + // TODO: cache the value + for (int i = 0; i < updated.length; ++i) { + long deleteMask = ~deleted[i]; + if ((updated[i] & deleteMask) != (WORD_MASK & deleteMask)) { + return false; + } + } + return true; + } + + public boolean isEmpty() { + // TODO: cache the value + for (int i = 0; i < deleted.length; ++i) { + if (deleted[i] != WORD_MASK) { + return false; + } + } + return true; + } + + public void resetUpdates() { + for (int i = 0; i < updated.length; ++i) { + updated[i] = 0; + } + } + + public void undeleteAll() { + for (int i = 0; i < updated.length; ++i) { + deleted[i] = 0; + } + } + + public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() { + ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder = + ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder(); + builder.setStartId(start); + for (int i = 0; i < updated.length; ++i) { + builder.addUpdated(updated[i]); + builder.addDeleted(deleted[i]); + } + return builder.build(); + } + + public static BitSetNode convert(ProcedureProtos.ProcedureStoreTracker.TrackerNode data) { + long start = data.getStartId(); + int size = data.getUpdatedCount(); + long[] updated = new long[size]; + long[] deleted = new long[size]; + for (int i = 0; i < size; ++i) { + updated[i] = data.getUpdated(i); + deleted[i] = data.getDeleted(i); + } + return new BitSetNode(start, updated, deleted); + } + + // ======================================================================== + // Grow/Merge Helpers + // ======================================================================== + public boolean canGrow(final long procId) { + return (procId - start) < MAX_NODE_SIZE; + } + + public boolean canMerge(final BitSetNode rightNode) { + return (start + rightNode.getEnd()) < MAX_NODE_SIZE; + } + + public void grow(final long procId) { + int delta, offset; + + if (procId < start) { + // add to head + long newStart = alignDown(procId); + delta = (int)(start - newStart) >> ADDRESS_BITS_PER_WORD; + offset = delta; + } else { + // Add to tail + long newEnd = alignUp(procId + 1); + delta = (int)(newEnd - getEnd()) >> ADDRESS_BITS_PER_WORD; + offset = 0; + } + + long[] newBitmap; + int oldSize = updated.length; + + newBitmap = new long[oldSize + delta]; + System.arraycopy(updated, 0, newBitmap, offset, oldSize); + updated = newBitmap; + + newBitmap = new long[deleted.length + delta]; + System.arraycopy(deleted, 0, newBitmap, offset, oldSize); + deleted = newBitmap; + + for (int i = 0; i < delta; ++i) { + updated[oldSize + i] = 0; + deleted[oldSize + i] = WORD_MASK; + } + } + + public void merge(final BitSetNode rightNode) { + int delta = (int)(rightNode.getEnd() - getEnd()) >> ADDRESS_BITS_PER_WORD; + + long[] newBitmap; + int oldSize = updated.length; + int newSize = (delta - rightNode.updated.length); + int offset = oldSize + newSize; + + newBitmap = new long[oldSize + delta]; + System.arraycopy(updated, 0, newBitmap, 0, oldSize); + System.arraycopy(rightNode.updated, 0, newBitmap, offset, rightNode.updated.length); + updated = newBitmap; + + newBitmap = new long[oldSize + delta]; + System.arraycopy(deleted, 0, newBitmap, 0, oldSize); + System.arraycopy(rightNode.deleted, 0, newBitmap, offset, rightNode.deleted.length); + deleted = newBitmap; + + for (int i = 0; i < newSize; ++i) { + updated[offset + i] = 0; + deleted[offset + i] = WORD_MASK; + } + } + + // ======================================================================== + // Min/Max Helpers + // ======================================================================== + public long getMinProcId() { + long minProcId = start; + for (int i = 0; i < deleted.length; ++i) { + if (deleted[i] == 0) { + return(minProcId); + } + + if (deleted[i] != WORD_MASK) { + for (int j = 0; j < BITS_PER_WORD; ++j) { + if ((deleted[i] & (1L << j)) != 0) { + return minProcId + j; + } + } + } + + minProcId += BITS_PER_WORD; + } + return minProcId; + } + + public long getMaxProcId() { + long maxProcId = getEnd(); + for (int i = deleted.length - 1; i >= 0; --i) { + if (deleted[i] == 0) { + return maxProcId; + } + + if (deleted[i] != WORD_MASK) { + for (int j = BITS_PER_WORD - 1; j >= 0; --j) { + if ((deleted[i] & (1L << j)) == 0) { + return maxProcId - (BITS_PER_WORD - 1 - j); + } + } + } + maxProcId -= BITS_PER_WORD; + } + return maxProcId; + } + + // ======================================================================== + // Bitmap Helpers + // ======================================================================== + private int getBitmapIndex(final long procId) { + return (int)(procId - start); + } + + private void updateState(final long procId, final boolean isDeleted) { + int bitmapIndex = getBitmapIndex(procId); + int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; + long value = (1L << bitmapIndex); + + if (isDeleted) { + updated[wordIndex] |= value; + deleted[wordIndex] |= value; + } else { + updated[wordIndex] |= value; + deleted[wordIndex] &= ~value; + } + } + + // ======================================================================== + // Helpers + // ======================================================================== + private static long alignUp(final long x) { + return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD; + } + + private static long alignDown(final long x) { + return x & -BITS_PER_WORD; + } + } + + public void insert(final Procedure proc, final Procedure[] subprocs) { + insert(proc.getProcId()); + if (subprocs != null) { + for (int i = 0; i < subprocs.length; ++i) { + insert(subprocs[i].getProcId()); + } + } + } + + public void update(final Procedure proc) { + update(proc.getProcId()); + } + + public void insert(long procId) { + BitSetNode node = getOrCreateNode(procId); + node.update(procId); + } + + public void update(long procId) { + Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); + assert entry != null : "expected node to update procId=" + procId; + + BitSetNode node = entry.getValue(); + assert node.contains(procId); + node.update(procId); + } + + public void delete(long procId) { + Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); + assert entry != null : "expected node to delete procId=" + procId; + + BitSetNode node = entry.getValue(); + assert node.contains(procId) : "expected procId in the node"; + node.delete(procId); + + if (!keepDeletes && node.isEmpty()) { + // TODO: RESET if (map.size() == 1) + map.remove(entry.getKey()); + } + } + + @InterfaceAudience.Private + public void setDeleted(final long procId, final boolean isDeleted) { + BitSetNode node = getOrCreateNode(procId); + node.updateState(procId, isDeleted); + } + + public void clear() { + this.map.clear(); + } + + public DeleteState isDeleted(long procId) { + Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); + if (entry != null) { + BitSetNode node = entry.getValue(); + DeleteState state = node.isDeleted(procId); + return partial && !node.isUpdated(procId) ? DeleteState.MAYBE : state; + } + return partial ? DeleteState.MAYBE : DeleteState.YES; + } + + public long getMinProcId() { + // TODO: Cache? + Map.Entry<Long, BitSetNode> entry = map.firstEntry(); + return entry == null ? 0 : entry.getValue().getMinProcId(); + } + + public void setKeepDeletes(boolean keepDeletes) { + this.keepDeletes = keepDeletes; + if (!keepDeletes) { + Iterator<Map.Entry<Long, BitSetNode>> it = map.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<Long, BitSetNode> entry = it.next(); + if (entry.getValue().isEmpty()) { + it.remove(); + } + } + } + } + + public void setPartialFlag(boolean isPartial) { + this.partial = isPartial; + } + + public boolean isEmpty() { + for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { + if (entry.getValue().isEmpty() == false) { + return false; + } + } + return true; + } + + public boolean isUpdated() { + for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { + if (entry.getValue().isUpdated() == false) { + return false; + } + } + return true; + } + + public void resetUpdates() { + for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { + entry.getValue().resetUpdates(); + } + } + + public void undeleteAll() { + for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { + entry.getValue().undeleteAll(); + } + } + + private BitSetNode getOrCreateNode(final long procId) { + // can procId fit in the left node? + BitSetNode leftNode = null; + boolean leftCanGrow = false; + Map.Entry<Long, BitSetNode> leftEntry = map.floorEntry(procId); + if (leftEntry != null) { + leftNode = leftEntry.getValue(); + if (leftNode.contains(procId)) { + return leftNode; + } + leftCanGrow = leftNode.canGrow(procId); + } + + BitSetNode rightNode = null; + boolean rightCanGrow = false; + Map.Entry<Long, BitSetNode> rightEntry = map.ceilingEntry(procId); + if (rightEntry != null) { + rightNode = rightEntry.getValue(); + rightCanGrow = rightNode.canGrow(procId); + if (leftNode != null) { + if (leftNode.canMerge(rightNode)) { + // merge left and right node + return mergeNodes(leftNode, rightNode); + } + + if (leftCanGrow && rightCanGrow) { + if ((procId - leftNode.getEnd()) <= (rightNode.getStart() - procId)) { + // grow the left node + return growNode(leftNode, procId); + } + // grow the right node + return growNode(rightNode, procId); + } + } + } + + // grow the left node + if (leftCanGrow) { + return growNode(leftNode, procId); + } + + // grow the right node + if (rightCanGrow) { + return growNode(rightNode, procId); + } + + // add new node + BitSetNode node = new BitSetNode(procId, partial); + map.put(node.getStart(), node); + return node; + } + + private BitSetNode growNode(BitSetNode node, long procId) { + map.remove(node.getStart()); + node.grow(procId); + map.put(node.getStart(), node); + return node; + } + + private BitSetNode mergeNodes(BitSetNode leftNode, BitSetNode rightNode) { + leftNode.merge(rightNode); + map.remove(rightNode.getStart()); + return leftNode; + } + + public void dump() { + System.out.println("map " + map.size()); + for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { + entry.getValue().dump(); + } + } + + public void writeTo(final OutputStream stream) throws IOException { + ProcedureProtos.ProcedureStoreTracker.Builder builder = + ProcedureProtos.ProcedureStoreTracker.newBuilder(); + for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { + builder.addNode(entry.getValue().convert()); + } + builder.build().writeDelimitedTo(stream); + } + + public void readFrom(final InputStream stream) throws IOException { + ProcedureProtos.ProcedureStoreTracker data = + ProcedureProtos.ProcedureStoreTracker.parseDelimitedFrom(stream); + map.clear(); + for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: data.getNodeList()) { + BitSetNode node = BitSetNode.convert(protoNode); + map.put(node.getStart(), node); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java new file mode 100644 index 0000000..29db3bf --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2.store.wal; + +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Thrown when a procedure WAL is corrupted + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class CorruptedWALProcedureStoreException extends HBaseIOException { + /** default constructor */ + public CorruptedWALProcedureStoreException() { + super(); + } + + /** + * Constructor + * @param s message + */ + public CorruptedWALProcedureStoreException(String s) { + super(s); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java new file mode 100644 index 0000000..859b3cb --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2.store.wal; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; +import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader; +import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTrailer; + +/** + * Describes a WAL File + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ProcedureWALFile implements Comparable<ProcedureWALFile> { + private static final Log LOG = LogFactory.getLog(ProcedureWALFile.class); + + private ProcedureWALHeader header; + private FSDataInputStream stream; + private FileStatus logStatus; + private FileSystem fs; + private Path logFile; + private long startPos; + + public ProcedureWALFile(final FileSystem fs, final FileStatus logStatus) { + this.fs = fs; + this.logStatus = logStatus; + this.logFile = logStatus.getPath(); + } + + public ProcedureWALFile(FileSystem fs, Path logFile, ProcedureWALHeader header, long startPos) { + this.fs = fs; + this.logFile = logFile; + this.header = header; + this.startPos = startPos; + } + + public void open() throws IOException { + if (stream == null) { + stream = fs.open(logFile); + } + + if (header == null) { + header = ProcedureWALFormat.readHeader(stream); + startPos = stream.getPos(); + } else { + stream.seek(startPos); + } + } + + public ProcedureWALTrailer readTrailer() throws IOException { + try { + return ProcedureWALFormat.readTrailer(stream, startPos, logStatus.getLen()); + } finally { + stream.seek(startPos); + } + } + + public void readTracker(ProcedureStoreTracker tracker) throws IOException { + ProcedureWALTrailer trailer = readTrailer(); + try { + stream.seek(trailer.getTrackerPos()); + tracker.readFrom(stream); + } finally { + stream.seek(startPos); + } + } + + public void close() { + if (stream == null) return; + try { + stream.close(); + } catch (IOException e) { + LOG.warn("unable to close the wal file: " + logFile, e); + } finally { + stream = null; + } + } + + public FSDataInputStream getStream() { + return stream; + } + + public ProcedureWALHeader getHeader() { + return header; + } + + public boolean isCompacted() { + return header.getType() == ProcedureWALFormat.LOG_TYPE_COMPACTED; + } + + public long getLogId() { + return header.getLogId(); + } + + public long getSize() { + return logStatus.getLen(); + } + + public void removeFile() throws IOException { + close(); + fs.delete(logFile, false); + } + + @Override + public int compareTo(final ProcedureWALFile other) { + long diff = header.getLogId() - other.header.getLogId(); + return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof ProcedureWALFile)) return false; + return compareTo((ProcedureWALFile)o) == 0; + } + + @Override + public int hashCode() { + return logFile.hashCode(); + } + + @Override + public String toString() { + return logFile.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java new file mode 100644 index 0000000..17432ac --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2.store.wal; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Iterator; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.hbase.io.util.StreamUtils; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; +import org.apache.hadoop.hbase.procedure2.util.ByteSlot; +import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry; +import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader; +import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTrailer; + +import com.google.protobuf.InvalidProtocolBufferException; + +/** + * Helper class that contains the WAL serialization utils. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class ProcedureWALFormat { + static final byte LOG_TYPE_STREAM = 0; + static final byte LOG_TYPE_COMPACTED = 1; + static final byte LOG_TYPE_MAX_VALID = 1; + + static final byte HEADER_VERSION = 1; + static final byte TRAILER_VERSION = 1; + static final long HEADER_MAGIC = 0x31764c4157637250L; + static final long TRAILER_MAGIC = 0x50726357414c7631L; + + @InterfaceAudience.Private + public static class InvalidWALDataException extends IOException { + public InvalidWALDataException(String s) { + super(s); + } + + public InvalidWALDataException(Throwable t) { + super(t); + } + } + + interface Loader { + void removeLog(ProcedureWALFile log); + void markCorruptedWAL(ProcedureWALFile log, IOException e); + } + + private ProcedureWALFormat() {} + + public static Iterator<Procedure> load(final Iterator<ProcedureWALFile> logs, + final ProcedureStoreTracker tracker, final Loader loader) throws IOException { + ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker); + tracker.setKeepDeletes(true); + try { + while (logs.hasNext()) { + ProcedureWALFile log = logs.next(); + log.open(); + try { + reader.read(log, loader); + } finally { + log.close(); + } + } + // The tracker is now updated with all the procedures read from the logs + tracker.setPartialFlag(false); + tracker.resetUpdates(); + } finally { + tracker.setKeepDeletes(false); + } + // TODO: Write compacted version? + return reader.getProcedures(); + } + + public static void writeHeader(OutputStream stream, ProcedureWALHeader header) + throws IOException { + header.writeDelimitedTo(stream); + } + + /* + * +-----------------+ + * | END OF WAL DATA | <---+ + * +-----------------+ | + * | | | + * | Tracker | | + * | | | + * +-----------------+ | + * | version | | + * +-----------------+ | + * | TRAILER_MAGIC | | + * +-----------------+ | + * | offset |-----+ + * +-----------------+ + */ + public static void writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker) + throws IOException { + long offset = stream.getPos(); + + // Write EOF Entry + ProcedureWALEntry.newBuilder() + .setType(ProcedureWALEntry.Type.EOF) + .build().writeDelimitedTo(stream); + + // Write Tracker + tracker.writeTo(stream); + + stream.write(TRAILER_VERSION); + StreamUtils.writeLong(stream, TRAILER_MAGIC); + StreamUtils.writeLong(stream, offset); + } + + public static ProcedureWALHeader readHeader(InputStream stream) + throws IOException { + ProcedureWALHeader header; + try { + header = ProcedureWALHeader.parseDelimitedFrom(stream); + } catch (InvalidProtocolBufferException e) { + throw new InvalidWALDataException(e); + } + + if (header == null) { + throw new InvalidWALDataException("No data available to read the Header"); + } + + if (header.getVersion() < 0 || header.getVersion() != HEADER_VERSION) { + throw new InvalidWALDataException("Invalid Header version. got " + header.getVersion() + + " expected " + HEADER_VERSION); + } + + if (header.getType() < 0 || header.getType() > LOG_TYPE_MAX_VALID) { + throw new InvalidWALDataException("Invalid header type. got " + header.getType()); + } + + return header; + } + + public static ProcedureWALTrailer readTrailer(FSDataInputStream stream, long startPos, long size) + throws IOException { + long trailerPos = size - 17; // Beginning of the Trailer Jump + + if (trailerPos < startPos) { + throw new InvalidWALDataException("Missing trailer: size=" + size + " startPos=" + startPos); + } + + stream.seek(trailerPos); + int version = stream.read(); + if (version != TRAILER_VERSION) { + throw new InvalidWALDataException("Invalid Trailer version. got " + version + + " expected " + TRAILER_VERSION); + } + + long magic = StreamUtils.readLong(stream); + if (magic != TRAILER_MAGIC) { + throw new InvalidWALDataException("Invalid Trailer magic. got " + magic + + " expected " + TRAILER_MAGIC); + } + + long trailerOffset = StreamUtils.readLong(stream); + stream.seek(trailerOffset); + + ProcedureWALEntry entry = readEntry(stream); + if (entry.getType() != ProcedureWALEntry.Type.EOF) { + throw new InvalidWALDataException("Invalid Trailer begin"); + } + + ProcedureWALTrailer trailer = ProcedureWALTrailer.newBuilder() + .setVersion(version) + .setTrackerPos(stream.getPos()) + .build(); + return trailer; + } + + public static ProcedureWALEntry readEntry(InputStream stream) throws IOException { + return ProcedureWALEntry.parseDelimitedFrom(stream); + } + + public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type, + Procedure proc, Procedure[] subprocs) throws IOException { + ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); + builder.setType(type); + builder.addProcedure(Procedure.convert(proc)); + if (subprocs != null) { + for (int i = 0; i < subprocs.length; ++i) { + builder.addProcedure(Procedure.convert(subprocs[i])); + } + } + builder.build().writeDelimitedTo(slot); + } + + public static void writeInsert(ByteSlot slot, Procedure proc) + throws IOException { + writeEntry(slot, ProcedureWALEntry.Type.INIT, proc, null); + } + + public static void writeInsert(ByteSlot slot, Procedure proc, Procedure[] subprocs) + throws IOException { + writeEntry(slot, ProcedureWALEntry.Type.INSERT, proc, subprocs); + } + + public static void writeUpdate(ByteSlot slot, Procedure proc) + throws IOException { + writeEntry(slot, ProcedureWALEntry.Type.UPDATE, proc, null); + } + + public static void writeDelete(ByteSlot slot, long procId) + throws IOException { + ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); + builder.setType(ProcedureWALEntry.Type.DELETE); + builder.setProcId(procId); + builder.build().writeDelimitedTo(slot); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java new file mode 100644 index 0000000..a60b8f5 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2.store.wal; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.HashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; +import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos; +import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry; + +/** + * Helper class that loads the procedures stored in a WAL + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ProcedureWALFormatReader { + private static final Log LOG = LogFactory.getLog(ProcedureWALFormatReader.class); + + private final ProcedureStoreTracker tracker; + //private final long compactionLogId; + + private final Map<Long, Procedure> procedures = new HashMap<Long, Procedure>(); + private final Map<Long, ProcedureProtos.Procedure> localProcedures = + new HashMap<Long, ProcedureProtos.Procedure>(); + + private long maxProcId = 0; + + public ProcedureWALFormatReader(final ProcedureStoreTracker tracker) { + this.tracker = tracker; + } + + public void read(ProcedureWALFile log, ProcedureWALFormat.Loader loader) throws IOException { + FSDataInputStream stream = log.getStream(); + try { + boolean hasMore = true; + while (hasMore) { + ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream); + if (entry == null) { + LOG.warn("nothing left to decode. exiting with missing EOF"); + hasMore = false; + break; + } + switch (entry.getType()) { + case INIT: + readInitEntry(entry); + break; + case INSERT: + readInsertEntry(entry); + break; + case UPDATE: + case COMPACT: + readUpdateEntry(entry); + break; + case DELETE: + readDeleteEntry(entry); + break; + case EOF: + hasMore = false; + break; + default: + throw new CorruptedWALProcedureStoreException("Invalid entry: " + entry); + } + } + } catch (IOException e) { + LOG.error("got an exception while reading the procedure WAL: " + log, e); + loader.markCorruptedWAL(log, e); + } + + if (localProcedures.isEmpty()) { + LOG.info("No active entry found in state log " + log + ". removing it"); + loader.removeLog(log); + } else { + Iterator<Map.Entry<Long, ProcedureProtos.Procedure>> itd = + localProcedures.entrySet().iterator(); + while (itd.hasNext()) { + Map.Entry<Long, ProcedureProtos.Procedure> entry = itd.next(); + itd.remove(); + + // Deserialize the procedure + Procedure proc = Procedure.convert(entry.getValue()); + procedures.put(entry.getKey(), proc); + } + + // TODO: Some procedure may be already runnables (see readInitEntry()) + // (we can also check the "update map" in the log trackers) + } + } + + public Iterator<Procedure> getProcedures() { + return procedures.values().iterator(); + } + + private void loadEntries(final ProcedureWALEntry entry) { + for (ProcedureProtos.Procedure proc: entry.getProcedureList()) { + maxProcId = Math.max(maxProcId, proc.getProcId()); + if (isRequired(proc.getProcId())) { + if (LOG.isTraceEnabled()) { + LOG.trace("read " + entry.getType() + " entry " + proc.getProcId()); + } + localProcedures.put(proc.getProcId(), proc); + tracker.setDeleted(proc.getProcId(), false); + } + } + } + + private void readInitEntry(final ProcedureWALEntry entry) + throws IOException { + assert entry.getProcedureCount() == 1 : "Expected only one procedure"; + // TODO: Make it runnable, before reading other files + loadEntries(entry); + } + + private void readInsertEntry(final ProcedureWALEntry entry) throws IOException { + assert entry.getProcedureCount() >= 1 : "Expected one or more procedures"; + loadEntries(entry); + } + + private void readUpdateEntry(final ProcedureWALEntry entry) throws IOException { + assert entry.getProcedureCount() == 1 : "Expected only one procedure"; + loadEntries(entry); + } + + private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException { + assert entry.getProcedureCount() == 0 : "Expected no procedures"; + assert entry.hasProcId() : "expected ProcID"; + if (LOG.isTraceEnabled()) { + LOG.trace("read delete entry " + entry.getProcId()); + } + maxProcId = Math.max(maxProcId, entry.getProcId()); + localProcedures.remove(entry.getProcId()); + tracker.setDeleted(entry.getProcId(), true); + } + + private boolean isDeleted(final long procId) { + return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES; + } + + private boolean isRequired(final long procId) { + return !isDeleted(procId) && !procedures.containsKey(procId); + } +} \ No newline at end of file