http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java new file mode 100644 index 0000000..fa5df11 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.helpers; + +/** + * Helper classes for the container protocol communication. + */ \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java new file mode 100644 index 0000000..dfa9315 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.lease; + +import org.apache.hadoop.util.Time; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; + +/** + * This class represents the lease created on a resource. Callback can be + * registered on the lease which will be executed in case of timeout. + * + * @param <T> Resource type for which the lease can be associated + */ +public class Lease<T> { + + /** + * The resource for which this lease is created. + */ + private final T resource; + + private final long creationTime; + + /** + * Lease lifetime in milliseconds. + */ + private volatile long leaseTimeout; + + private boolean expired; + + /** + * Functions to be called in case of timeout. + */ + private List<Callable<Void>> callbacks; + + + /** + * Creates a lease on the specified resource with given timeout. + * + * @param resource + * Resource for which the lease has to be created + * @param timeout + * Lease lifetime in milliseconds + */ + public Lease(T resource, long timeout) { + this.resource = resource; + this.leaseTimeout = timeout; + this.callbacks = Collections.synchronizedList(new ArrayList<>()); + this.creationTime = Time.monotonicNow(); + this.expired = false; + } + + /** + * Returns true if the lease has expired, else false. + * + * @return true if expired, else false + */ + public boolean hasExpired() { + return expired; + } + + /** + * Registers a callback which will be executed in case of timeout. Callbacks + * are executed in a separate Thread. + * + * @param callback + * The Callable which has to be executed + * @throws LeaseExpiredException + * If the lease has already timed out + */ + public void registerCallBack(Callable<Void> callback) + throws LeaseExpiredException { + if(hasExpired()) { + throw new LeaseExpiredException("Resource: " + resource); + } + callbacks.add(callback); + } + + /** + * Returns the time elapsed since the creation of lease. + * + * @return elapsed time in milliseconds + * @throws LeaseExpiredException + * If the lease has already timed out + */ + public long getElapsedTime() throws LeaseExpiredException { + if(hasExpired()) { + throw new LeaseExpiredException("Resource: " + resource); + } + return Time.monotonicNow() - creationTime; + } + + /** + * Returns the time available before timeout. + * + * @return remaining time in milliseconds + * @throws LeaseExpiredException + * If the lease has already timed out + */ + public long getRemainingTime() throws LeaseExpiredException { + if(hasExpired()) { + throw new LeaseExpiredException("Resource: " + resource); + } + return leaseTimeout - getElapsedTime(); + } + + /** + * Returns total lease lifetime. + * + * @return total lifetime of lease in milliseconds + * @throws LeaseExpiredException + * If the lease has already timed out + */ + public long getLeaseLifeTime() throws LeaseExpiredException { + if(hasExpired()) { + throw new LeaseExpiredException("Resource: " + resource); + } + return leaseTimeout; + } + + /** + * Renews the lease timeout period. + * + * @param timeout + * Time to be added to the lease in milliseconds + * @throws LeaseExpiredException + * If the lease has already timed out + */ + public void renew(long timeout) throws LeaseExpiredException { + if(hasExpired()) { + throw new LeaseExpiredException("Resource: " + resource); + } + leaseTimeout += timeout; + } + + @Override + public int hashCode() { + return resource.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if(obj instanceof Lease) { + return resource.equals(((Lease) obj).resource); + } + return false; + } + + @Override + public String toString() { + return "Lease<" + resource.toString() + ">"; + } + + /** + * Returns the callbacks to be executed for the lease in case of timeout. + * + * @return callbacks to be executed + */ + List<Callable<Void>> getCallbacks() { + return callbacks; + } + + /** + * Expires/Invalidates the lease. + */ + void invalidate() { + callbacks = null; + expired = true; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseAlreadyExistException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseAlreadyExistException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseAlreadyExistException.java new file mode 100644 index 0000000..a39ea22 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseAlreadyExistException.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.lease; + +/** + * This exception represents that there is already a lease acquired on the + * same resource. + */ +public class LeaseAlreadyExistException extends LeaseException { + + /** + * Constructs an {@code LeaseAlreadyExistException} with {@code null} + * as its error detail message. + */ + public LeaseAlreadyExistException() { + super(); + } + + /** + * Constructs an {@code LeaseAlreadyExistException} with the specified + * detail message. + * + * @param message + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + */ + public LeaseAlreadyExistException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseCallbackExecutor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseCallbackExecutor.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseCallbackExecutor.java new file mode 100644 index 0000000..1b7391b --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseCallbackExecutor.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.lease; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.Callable; + +/** + * This class is responsible for executing the callbacks of a lease in case of + * timeout. + */ +public class LeaseCallbackExecutor<T> implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(Lease.class); + + private final T resource; + private final List<Callable<Void>> callbacks; + + /** + * Constructs LeaseCallbackExecutor instance with list of callbacks. + * + * @param resource + * The resource for which the callbacks are executed + * @param callbacks + * Callbacks to be executed by this executor + */ + public LeaseCallbackExecutor(T resource, List<Callable<Void>> callbacks) { + this.resource = resource; + this.callbacks = callbacks; + } + + @Override + public void run() { + if(LOG.isDebugEnabled()) { + LOG.debug("Executing callbacks for lease on {}", resource); + } + for(Callable<Void> callback : callbacks) { + try { + callback.call(); + } catch (Exception e) { + LOG.warn("Exception while executing callback for lease on {}", + resource, e); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseException.java new file mode 100644 index 0000000..418f412 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseException.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.lease; + +/** + * This exception represents all lease related exceptions. + */ +public class LeaseException extends Exception { + + /** + * Constructs an {@code LeaseException} with {@code null} + * as its error detail message. + */ + public LeaseException() { + super(); + } + + /** + * Constructs an {@code LeaseException} with the specified + * detail message. + * + * @param message + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + */ + public LeaseException(String message) { + super(message); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseExpiredException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseExpiredException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseExpiredException.java new file mode 100644 index 0000000..440a023 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseExpiredException.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.lease; + +/** + * This exception represents that the lease that is being accessed has expired. + */ +public class LeaseExpiredException extends LeaseException { + + /** + * Constructs an {@code LeaseExpiredException} with {@code null} + * as its error detail message. + */ + public LeaseExpiredException() { + super(); + } + + /** + * Constructs an {@code LeaseExpiredException} with the specified + * detail message. + * + * @param message + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + */ + public LeaseExpiredException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java new file mode 100644 index 0000000..b8390dd --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.lease; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * LeaseManager is someone who can provide you leases based on your + * requirement. If you want to return the lease back before it expires, + * you can give it back to Lease Manager. He is the one responsible for + * the lifecycle of leases. The resource for which lease is created + * should have proper {@code equals} method implementation, resource + * equality is checked while the lease is created. + * + * @param <T> Type of leases that this lease manager can create + */ +public class LeaseManager<T> { + + private static final Logger LOG = + LoggerFactory.getLogger(LeaseManager.class); + + private final long defaultTimeout; + private Map<T, Lease<T>> activeLeases; + private LeaseMonitor leaseMonitor; + private Thread leaseMonitorThread; + private boolean isRunning; + + /** + * Creates an instance of lease manager. + * + * @param defaultTimeout + * Default timeout in milliseconds to be used for lease creation. + */ + public LeaseManager(long defaultTimeout) { + this.defaultTimeout = defaultTimeout; + } + + /** + * Starts the lease manager service. + */ + public void start() { + LOG.debug("Starting LeaseManager service"); + activeLeases = new ConcurrentHashMap<>(); + leaseMonitor = new LeaseMonitor(); + leaseMonitorThread = new Thread(leaseMonitor); + leaseMonitorThread.setName("LeaseManager#LeaseMonitor"); + leaseMonitorThread.setDaemon(true); + leaseMonitorThread.setUncaughtExceptionHandler((thread, throwable) -> { + // Let us just restart this thread after logging an error. + // if this thread is not running we cannot handle Lease expiry. + LOG.error("LeaseMonitor thread encountered an error. Thread: {}", + thread.toString(), throwable); + leaseMonitorThread.start(); + }); + LOG.debug("Starting LeaseManager#LeaseMonitor Thread"); + leaseMonitorThread.start(); + isRunning = true; + } + + /** + * Returns a lease for the specified resource with default timeout. + * + * @param resource + * Resource for which lease has to be created + * @throws LeaseAlreadyExistException + * If there is already a lease on the resource + */ + public synchronized Lease<T> acquire(T resource) + throws LeaseAlreadyExistException { + return acquire(resource, defaultTimeout); + } + + /** + * Returns a lease for the specified resource with the timeout provided. + * + * @param resource + * Resource for which lease has to be created + * @param timeout + * The timeout in milliseconds which has to be set on the lease + * @throws LeaseAlreadyExistException + * If there is already a lease on the resource + */ + public synchronized Lease<T> acquire(T resource, long timeout) + throws LeaseAlreadyExistException { + checkStatus(); + if(LOG.isDebugEnabled()) { + LOG.debug("Acquiring lease on {} for {} milliseconds", resource, timeout); + } + if(activeLeases.containsKey(resource)) { + throw new LeaseAlreadyExistException("Resource: " + resource); + } + Lease<T> lease = new Lease<>(resource, timeout); + activeLeases.put(resource, lease); + leaseMonitorThread.interrupt(); + return lease; + } + + /** + * Returns a lease associated with the specified resource. + * + * @param resource + * Resource for which the lease has to be returned + * @throws LeaseNotFoundException + * If there is no active lease on the resource + */ + public Lease<T> get(T resource) throws LeaseNotFoundException { + checkStatus(); + Lease<T> lease = activeLeases.get(resource); + if(lease != null) { + return lease; + } + throw new LeaseNotFoundException("Resource: " + resource); + } + + /** + * Releases the lease associated with the specified resource. + * + * @param resource + * The for which the lease has to be released + * @throws LeaseNotFoundException + * If there is no active lease on the resource + */ + public synchronized void release(T resource) + throws LeaseNotFoundException { + checkStatus(); + if(LOG.isDebugEnabled()) { + LOG.debug("Releasing lease on {}", resource); + } + Lease<T> lease = activeLeases.remove(resource); + if(lease == null) { + throw new LeaseNotFoundException("Resource: " + resource); + } + lease.invalidate(); + } + + /** + * Shuts down the LeaseManager and releases the resources. All the active + * {@link Lease} will be released (callbacks on leases will not be + * executed). + */ + public void shutdown() { + checkStatus(); + LOG.debug("Shutting down LeaseManager service"); + leaseMonitor.disable(); + leaseMonitorThread.interrupt(); + for(T resource : activeLeases.keySet()) { + try { + release(resource); + } catch(LeaseNotFoundException ex) { + //Ignore the exception, someone might have released the lease + } + } + isRunning = false; + } + + /** + * Throws {@link LeaseManagerNotRunningException} if the service is not + * running. + */ + private void checkStatus() { + if(!isRunning) { + throw new LeaseManagerNotRunningException("LeaseManager not running."); + } + } + + /** + * Monitors the leases and expires them based on the timeout, also + * responsible for executing the callbacks of expired leases. + */ + private final class LeaseMonitor implements Runnable { + + private boolean monitor = true; + private ExecutorService executorService; + + private LeaseMonitor() { + this.monitor = true; + this.executorService = Executors.newCachedThreadPool(); + } + + @Override + public void run() { + while(monitor) { + LOG.debug("LeaseMonitor: checking for lease expiry"); + long sleepTime = Long.MAX_VALUE; + + for (T resource : activeLeases.keySet()) { + try { + Lease<T> lease = get(resource); + long remainingTime = lease.getRemainingTime(); + if (remainingTime <= 0) { + //Lease has timed out + List<Callable<Void>> leaseCallbacks = lease.getCallbacks(); + release(resource); + executorService.execute( + new LeaseCallbackExecutor(resource, leaseCallbacks)); + } else { + sleepTime = remainingTime > sleepTime ? + sleepTime : remainingTime; + } + } catch (LeaseNotFoundException | LeaseExpiredException ex) { + //Ignore the exception, someone might have released the lease + } + } + + try { + if(!Thread.interrupted()) { + Thread.sleep(sleepTime); + } + } catch (InterruptedException ignored) { + // This means a new lease is added to activeLeases. + } + } + } + + /** + * Disables lease monitor, next interrupt call on the thread + * will stop lease monitor. + */ + public void disable() { + monitor = false; + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManagerNotRunningException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManagerNotRunningException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManagerNotRunningException.java new file mode 100644 index 0000000..ced31de --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManagerNotRunningException.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.lease; + +/** + * This exception represents that there LeaseManager service is not running. + */ +public class LeaseManagerNotRunningException extends RuntimeException { + + /** + * Constructs an {@code LeaseManagerNotRunningException} with {@code null} + * as its error detail message. + */ + public LeaseManagerNotRunningException() { + super(); + } + + /** + * Constructs an {@code LeaseManagerNotRunningException} with the specified + * detail message. + * + * @param message + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + */ + public LeaseManagerNotRunningException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseNotFoundException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseNotFoundException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseNotFoundException.java new file mode 100644 index 0000000..c292d33 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseNotFoundException.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.lease; + +/** + * This exception represents that the lease that is being accessed does not + * exist. + */ +public class LeaseNotFoundException extends LeaseException { + + /** + * Constructs an {@code LeaseNotFoundException} with {@code null} + * as its error detail message. + */ + public LeaseNotFoundException() { + super(); + } + + /** + * Constructs an {@code LeaseNotFoundException} with the specified + * detail message. + * + * @param message + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + */ + public LeaseNotFoundException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/package-info.java new file mode 100644 index 0000000..48ee2e1 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/package-info.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** + * A generic lease management API which can be used if a service + * needs any kind of lease management. + */ + +package org.apache.hadoop.ozone.lease; +/* + This package contains lease management related classes. + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/package-info.java new file mode 100644 index 0000000..db399db --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/package-info.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone; + +/** + This package contains class that support ozone implementation on the datanode + side. + + Main parts of ozone on datanode are: + + 1. REST Interface - This code lives under the web directory and listens to the + WebHDFS port. + + 2. Datanode container classes: This support persistence of ozone objects on + datanode. These classes live under container directory. + + 3. Client and Shell: We also support a ozone REST client lib, they are under + web/client and web/ozShell. + + */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java new file mode 100644 index 0000000..fa79341 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocolPB; + +import com.google.common.collect.Sets; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdds.scm.ScmInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; +import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; +import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .AllocateScmBlockRequestProto; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .AllocateScmBlockResponseProto; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .DeleteKeyBlocksResultProto; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .DeleteScmKeyBlocksRequestProto; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .DeleteScmKeyBlocksResponseProto; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .GetScmBlockLocationsRequestProto; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .GetScmBlockLocationsResponseProto; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .ScmLocatedBlockProto; +import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class is the server-side translator that forwards requests received on + * {@link StorageContainerLocationProtocolPB} to the + * {@link StorageContainerLocationProtocol} server implementation. + */ [email protected] +public final class ScmBlockLocationProtocolServerSideTranslatorPB + implements ScmBlockLocationProtocolPB { + + private final ScmBlockLocationProtocol impl; + + /** + * Creates a new ScmBlockLocationProtocolServerSideTranslatorPB. + * + * @param impl {@link ScmBlockLocationProtocol} server implementation + */ + public ScmBlockLocationProtocolServerSideTranslatorPB( + ScmBlockLocationProtocol impl) throws IOException { + this.impl = impl; + } + + + @Override + public GetScmBlockLocationsResponseProto getScmBlockLocations( + RpcController controller, GetScmBlockLocationsRequestProto req) + throws ServiceException { + Set<String> keys = Sets.newLinkedHashSetWithExpectedSize( + req.getKeysCount()); + for (String key : req.getKeysList()) { + keys.add(key); + } + final Set<AllocatedBlock> blocks; + try { + blocks = impl.getBlockLocations(keys); + } catch (IOException ex) { + throw new ServiceException(ex); + } + GetScmBlockLocationsResponseProto.Builder resp = + GetScmBlockLocationsResponseProto.newBuilder(); + for (AllocatedBlock block: blocks) { + ScmLocatedBlockProto.Builder locatedBlock = + ScmLocatedBlockProto.newBuilder() + .setKey(block.getKey()) + .setPipeline(block.getPipeline().getProtobufMessage()); + resp.addLocatedBlocks(locatedBlock.build()); + } + return resp.build(); + } + + @Override + public AllocateScmBlockResponseProto allocateScmBlock( + RpcController controller, AllocateScmBlockRequestProto request) + throws ServiceException { + try { + AllocatedBlock allocatedBlock = + impl.allocateBlock(request.getSize(), request.getType(), + request.getFactor(), request.getOwner()); + if (allocatedBlock != null) { + return + AllocateScmBlockResponseProto.newBuilder() + .setKey(allocatedBlock.getKey()) + .setPipeline(allocatedBlock.getPipeline().getProtobufMessage()) + .setCreateContainer(allocatedBlock.getCreateContainer()) + .setErrorCode(AllocateScmBlockResponseProto.Error.success) + .build(); + } else { + return AllocateScmBlockResponseProto.newBuilder() + .setErrorCode(AllocateScmBlockResponseProto.Error.unknownFailure) + .build(); + } + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public DeleteScmKeyBlocksResponseProto deleteScmKeyBlocks( + RpcController controller, DeleteScmKeyBlocksRequestProto req) + throws ServiceException { + DeleteScmKeyBlocksResponseProto.Builder resp = + DeleteScmKeyBlocksResponseProto.newBuilder(); + try { + List<BlockGroup> infoList = req.getKeyBlocksList().stream() + .map(BlockGroup::getFromProto).collect(Collectors.toList()); + final List<DeleteBlockGroupResult> results = + impl.deleteKeyBlocks(infoList); + for (DeleteBlockGroupResult result: results) { + DeleteKeyBlocksResultProto.Builder deleteResult = + DeleteKeyBlocksResultProto + .newBuilder() + .setObjectKey(result.getObjectKey()) + .addAllBlockResults(result.getBlockResultProtoList()); + resp.addResults(deleteResult.build()); + } + } catch (IOException ex) { + throw new ServiceException(ex); + } + return resp.build(); + } + + @Override + public HddsProtos.GetScmInfoRespsonseProto getScmInfo( + RpcController controller, HddsProtos.GetScmInfoRequestProto req) + throws ServiceException { + ScmInfo scmInfo; + try { + scmInfo = impl.getScmInfo(); + } catch (IOException ex) { + throw new ServiceException(ex); + } + return HddsProtos.GetScmInfoRespsonseProto.newBuilder() + .setClusterId(scmInfo.getClusterId()) + .setScmId(scmInfo.getScmId()) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java new file mode 100644 index 0000000..4974268 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -0,0 +1,212 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocolPB; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdds.scm.ScmInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.ContainerRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.ContainerResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.GetContainerRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.GetContainerResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.ObjectStageChangeResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.PipelineRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.PipelineResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.SCMDeleteContainerResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.SCMListContainerRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.SCMListContainerResponseProto; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.List; + +/** + * This class is the server-side translator that forwards requests received on + * {@link StorageContainerLocationProtocolPB} to the + * {@link StorageContainerLocationProtocol} server implementation. + */ [email protected] +public final class StorageContainerLocationProtocolServerSideTranslatorPB + implements StorageContainerLocationProtocolPB { + + private final StorageContainerLocationProtocol impl; + + /** + * Creates a new StorageContainerLocationProtocolServerSideTranslatorPB. + * + * @param impl {@link StorageContainerLocationProtocol} server implementation + */ + public StorageContainerLocationProtocolServerSideTranslatorPB( + StorageContainerLocationProtocol impl) throws IOException { + this.impl = impl; + } + + @Override + public ContainerResponseProto allocateContainer(RpcController unused, + ContainerRequestProto request) throws ServiceException { + try { + Pipeline pipeline = impl.allocateContainer(request.getReplicationType(), + request.getReplicationFactor(), request.getContainerName(), + request.getOwner()); + return ContainerResponseProto.newBuilder() + .setPipeline(pipeline.getProtobufMessage()) + .setErrorCode(ContainerResponseProto.Error.success) + .build(); + + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetContainerResponseProto getContainer( + RpcController controller, GetContainerRequestProto request) + throws ServiceException { + try { + Pipeline pipeline = impl.getContainer(request.getContainerName()); + return GetContainerResponseProto.newBuilder() + .setPipeline(pipeline.getProtobufMessage()) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public SCMListContainerResponseProto listContainer(RpcController controller, + SCMListContainerRequestProto request) throws ServiceException { + try { + String startName = null; + String prefixName = null; + int count = -1; + + // Arguments check. + if (request.hasPrefixName()) { + // End container name is given. + prefixName = request.getPrefixName(); + } + if (request.hasStartName()) { + // End container name is given. + startName = request.getStartName(); + } + + count = request.getCount(); + List<ContainerInfo> containerList = + impl.listContainer(startName, prefixName, count); + SCMListContainerResponseProto.Builder builder = + SCMListContainerResponseProto.newBuilder(); + for (ContainerInfo container : containerList) { + builder.addContainers(container.getProtobuf()); + } + return builder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public SCMDeleteContainerResponseProto deleteContainer( + RpcController controller, SCMDeleteContainerRequestProto request) + throws ServiceException { + try { + impl.deleteContainer(request.getContainerName()); + return SCMDeleteContainerResponseProto.newBuilder().build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public StorageContainerLocationProtocolProtos.NodeQueryResponseProto + queryNode(RpcController controller, + StorageContainerLocationProtocolProtos.NodeQueryRequestProto request) + throws ServiceException { + try { + EnumSet<HddsProtos.NodeState> nodeStateEnumSet = EnumSet.copyOf(request + .getQueryList()); + HddsProtos.NodePool datanodes = impl.queryNode(nodeStateEnumSet, + request.getScope(), request.getPoolName()); + return StorageContainerLocationProtocolProtos + .NodeQueryResponseProto.newBuilder() + .setDatanodes(datanodes) + .build(); + } catch (Exception e) { + throw new ServiceException(e); + } + } + + @Override + public ObjectStageChangeResponseProto notifyObjectStageChange( + RpcController controller, ObjectStageChangeRequestProto request) + throws ServiceException { + try { + impl.notifyObjectStageChange(request.getType(), request.getName(), + request.getOp(), request.getStage()); + return ObjectStageChangeResponseProto.newBuilder().build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public PipelineResponseProto allocatePipeline( + RpcController controller, PipelineRequestProto request) + throws ServiceException { + // TODO : Wiring this up requires one more patch. + return null; + } + + @Override + public HddsProtos.GetScmInfoRespsonseProto getScmInfo( + RpcController controller, HddsProtos.GetScmInfoRequestProto req) + throws ServiceException { + try { + ScmInfo scmInfo = impl.getScmInfo(); + return HddsProtos.GetScmInfoRespsonseProto.newBuilder() + .setClusterId(scmInfo.getClusterId()) + .setScmId(scmInfo.getScmId()) + .build(); + } catch (IOException ex) { + throw new ServiceException(ex); + } + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/package-info.java new file mode 100644 index 0000000..860386d --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.protocolPB; + +/** + * This package contains classes for the Protocol Buffers binding of Ozone + * protocols. + */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/web/utils/JsonUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/web/utils/JsonUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/web/utils/JsonUtils.java new file mode 100644 index 0000000..af56da3 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/web/utils/JsonUtils.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.web.utils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.type.CollectionType; + +import java.io.IOException; +import java.util.List; + +/** + * JSON Utility functions used in ozone. + */ +public final class JsonUtils { + + // Reuse ObjectMapper instance for improving performance. + // ObjectMapper is thread safe as long as we always configure instance + // before use. + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final ObjectReader READER = MAPPER.readerFor(Object.class); + private static final ObjectWriter WRITTER = + MAPPER.writerWithDefaultPrettyPrinter(); + + private JsonUtils() { + // Never constructed + } + + public static String toJsonStringWithDefaultPrettyPrinter(String jsonString) + throws IOException { + Object json = READER.readValue(jsonString); + return WRITTER.writeValueAsString(json); + } + + public static String toJsonString(Object obj) throws IOException { + return MAPPER.writeValueAsString(obj); + } + + /** + * Deserialize a list of elements from a given string, + * each element in the list is in the given type. + * + * @param str json string. + * @param elementType element type. + * @return List of elements of type elementType + * @throws IOException + */ + public static List<?> toJsonList(String str, Class<?> elementType) + throws IOException { + CollectionType type = MAPPER.getTypeFactory() + .constructCollectionType(List.class, elementType); + return MAPPER.readValue(str, type); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundService.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundService.java new file mode 100644 index 0000000..431da64 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundService.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.utils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * An abstract class for a background service in ozone. + * A background service schedules multiple child tasks in parallel + * in a certain period. In each interval, it waits until all the tasks + * finish execution and then schedule next interval. + */ +public abstract class BackgroundService { + + @VisibleForTesting + public static final Logger LOG = + LoggerFactory.getLogger(BackgroundService.class); + + // Executor to launch child tasks + private final ScheduledExecutorService exec; + private final ThreadGroup threadGroup; + private final ThreadFactory threadFactory; + private final String serviceName; + private final long interval; + private final long serviceTimeout; + private final TimeUnit unit; + private final PeriodicalTask service; + + public BackgroundService(String serviceName, long interval, + TimeUnit unit, int threadPoolSize, long serviceTimeout) { + this.interval = interval; + this.unit = unit; + this.serviceName = serviceName; + this.serviceTimeout = serviceTimeout; + threadGroup = new ThreadGroup(serviceName); + ThreadFactory tf = r -> new Thread(threadGroup, r); + threadFactory = new ThreadFactoryBuilder() + .setThreadFactory(tf) + .setDaemon(true) + .setNameFormat(serviceName + "#%d") + .build(); + exec = Executors.newScheduledThreadPool(threadPoolSize, threadFactory); + service = new PeriodicalTask(); + } + + protected ExecutorService getExecutorService() { + return this.exec; + } + + @VisibleForTesting + public int getThreadCount() { + return threadGroup.activeCount(); + } + + @VisibleForTesting + public void triggerBackgroundTaskForTesting() { + service.run(); + } + + // start service + public void start() { + exec.scheduleWithFixedDelay(service, 0, interval, unit); + } + + public abstract BackgroundTaskQueue getTasks(); + + /** + * Run one or more background tasks concurrently. + * Wait until all tasks to return the result. + */ + public class PeriodicalTask implements Runnable { + @Override + public synchronized void run() { + LOG.debug("Running background service : {}", serviceName); + BackgroundTaskQueue tasks = getTasks(); + if (tasks.isEmpty()) { + // No task found, or some problems to init tasks + // return and retry in next interval. + return; + } + + LOG.debug("Number of background tasks to execute : {}", tasks.size()); + CompletionService<BackgroundTaskResult> taskCompletionService = + new ExecutorCompletionService<>(exec); + + List<Future<BackgroundTaskResult>> results = Lists.newArrayList(); + while (tasks.size() > 0) { + BackgroundTask task = tasks.poll(); + Future<BackgroundTaskResult> result = + taskCompletionService.submit(task); + results.add(result); + } + + results.parallelStream().forEach(taskResultFuture -> { + try { + // Collect task results + BackgroundTaskResult result = serviceTimeout > 0 + ? taskResultFuture.get(serviceTimeout, TimeUnit.MILLISECONDS) + : taskResultFuture.get(); + if (LOG.isDebugEnabled()) { + LOG.debug("task execution result size {}", result.getSize()); + } + } catch (InterruptedException | ExecutionException e) { + LOG.warn( + "Background task fails to execute, " + + "retrying in next interval", e); + } catch (TimeoutException e) { + LOG.warn("Background task executes timed out, " + + "retrying in next interval", e); + } + }); + } + } + + // shutdown and make sure all threads are properly released. + public void shutdown() { + LOG.info("Shutting down service {}", this.serviceName); + exec.shutdown(); + try { + if (!exec.awaitTermination(60, TimeUnit.SECONDS)) { + exec.shutdownNow(); + } + } catch (InterruptedException e) { + exec.shutdownNow(); + } + if (threadGroup.activeCount() == 0 && !threadGroup.isDestroyed()) { + threadGroup.destroy(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTask.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTask.java new file mode 100644 index 0000000..47e8ebc --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTask.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.utils; + +import java.util.concurrent.Callable; + +/** + * A task thread to run by {@link BackgroundService}. + */ +public interface BackgroundTask<T> extends Callable<T> { + + int getPriority(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java new file mode 100644 index 0000000..b56ef0c --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.utils; + +import java.util.PriorityQueue; + +/** + * A priority queue that stores a number of {@link BackgroundTask}. + */ +public class BackgroundTaskQueue { + + private final PriorityQueue<BackgroundTask> tasks; + + public BackgroundTaskQueue() { + tasks = new PriorityQueue<>((task1, task2) + -> task1.getPriority() - task2.getPriority()); + } + + /** + * @return the head task in this queue. + */ + public synchronized BackgroundTask poll() { + return tasks.poll(); + } + + /** + * Add a {@link BackgroundTask} to the queue, + * the task will be sorted by its priority. + * + * @param task + */ + public synchronized void add(BackgroundTask task) { + tasks.add(task); + } + + /** + * @return true if the queue contains no task, false otherwise. + */ + public synchronized boolean isEmpty() { + return tasks.isEmpty(); + } + + /** + * @return the size of the queue. + */ + public synchronized int size() { + return tasks.size(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java new file mode 100644 index 0000000..198300f --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.utils; + +/** + * Result of a {@link BackgroundTask}. + */ +public interface BackgroundTaskResult { + + /** + * Returns the size of entries included in this result. + */ + int getSize(); + + /** + * An empty task result implementation. + */ + class EmptyTaskResult implements BackgroundTaskResult { + + public static EmptyTaskResult newResult() { + return new EmptyTaskResult(); + } + + @Override + public int getSize() { + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BatchOperation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BatchOperation.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BatchOperation.java new file mode 100644 index 0000000..47699eb --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BatchOperation.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.utils; + +import com.google.common.collect.Lists; + +import java.util.List; + +/** + * An utility class to store a batch of DB write operations. + */ +public class BatchOperation { + + /** + * Enum for write operations. + */ + public enum Operation { + DELETE, PUT + } + + private List<SingleOperation> operations = + Lists.newArrayList(); + + /** + * Add a PUT operation into the batch. + */ + public void put(byte[] key, byte[] value) { + operations.add(new SingleOperation(Operation.PUT, key, value)); + } + + /** + * Add a DELETE operation into the batch. + */ + public void delete(byte[] key) { + operations.add(new SingleOperation(Operation.DELETE, key, null)); + + } + + public List<SingleOperation> getOperations() { + return operations; + } + + /** + * A SingleOperation represents a PUT or DELETE operation + * and the data the operation needs to manipulates. + */ + public static class SingleOperation { + + private Operation opt; + private byte[] key; + private byte[] value; + + public SingleOperation(Operation opt, byte[] key, byte[] value) { + this.opt = opt; + if (key == null) { + throw new IllegalArgumentException("key cannot be null"); + } + this.key = key.clone(); + this.value = value == null ? null : value.clone(); + } + + public Operation getOpt() { + return opt; + } + + public byte[] getKey() { + return key.clone(); + } + + public byte[] getValue() { + return value == null ? null : value.clone(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/EntryConsumer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/EntryConsumer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/EntryConsumer.java new file mode 100644 index 0000000..c407398 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/EntryConsumer.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.utils; + +import java.io.IOException; + +/** + * A consumer for metadata store key-value entries. + * Used by {@link MetadataStore} class. + */ +@FunctionalInterface +public interface EntryConsumer { + + /** + * Consumes a key and value and produces a boolean result. + * @param key key + * @param value value + * @return a boolean value produced by the consumer + * @throws IOException + */ + boolean consume(byte[] key, byte[] value) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java new file mode 100644 index 0000000..83ca83d --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.utils; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter; +import org.fusesource.leveldbjni.JniDBFactory; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBIterator; +import org.iq80.leveldb.Options; +import org.iq80.leveldb.ReadOptions; +import org.iq80.leveldb.Snapshot; +import org.iq80.leveldb.WriteBatch; +import org.iq80.leveldb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * LevelDB interface. + */ +public class LevelDBStore implements MetadataStore { + + private static final Logger LOG = + LoggerFactory.getLogger(LevelDBStore.class); + + private DB db; + private final File dbFile; + private final Options dbOptions; + private final WriteOptions writeOptions; + + public LevelDBStore(File dbPath, boolean createIfMissing) + throws IOException { + dbOptions = new Options(); + dbOptions.createIfMissing(createIfMissing); + this.dbFile = dbPath; + this.writeOptions = new WriteOptions().sync(true); + openDB(dbPath, dbOptions); + } + + /** + * Opens a DB file. + * + * @param dbPath - DB File path + * @throws IOException + */ + public LevelDBStore(File dbPath, Options options) + throws IOException { + dbOptions = options; + this.dbFile = dbPath; + this.writeOptions = new WriteOptions().sync(true); + openDB(dbPath, dbOptions); + } + + private void openDB(File dbPath, Options options) throws IOException { + dbPath.getParentFile().mkdirs(); + db = JniDBFactory.factory.open(dbPath, options); + if (LOG.isDebugEnabled()) { + LOG.debug("LevelDB successfully opened"); + LOG.debug("[Option] cacheSize = " + options.cacheSize()); + LOG.debug("[Option] createIfMissing = " + options.createIfMissing()); + LOG.debug("[Option] blockSize = " + options.blockSize()); + LOG.debug("[Option] compressionType= " + options.compressionType()); + LOG.debug("[Option] maxOpenFiles= " + options.maxOpenFiles()); + LOG.debug("[Option] writeBufferSize= "+ options.writeBufferSize()); + } + } + + /** + * Puts a Key into file. + * + * @param key - key + * @param value - value + */ + @Override + public void put(byte[] key, byte[] value) { + db.put(key, value, writeOptions); + } + + /** + * Get Key. + * + * @param key key + * @return value + */ + @Override + public byte[] get(byte[] key) { + return db.get(key); + } + + /** + * Delete Key. + * + * @param key - Key + */ + @Override + public void delete(byte[] key) { + db.delete(key); + } + + /** + * Closes the DB. + * + * @throws IOException + */ + @Override + public void close() throws IOException { + if (db != null){ + db.close(); + } + } + + /** + * Returns true if the DB is empty. + * + * @return boolean + * @throws IOException + */ + @Override + public boolean isEmpty() throws IOException { + try (DBIterator iter = db.iterator()) { + iter.seekToFirst(); + boolean hasNext = !iter.hasNext(); + return hasNext; + } + } + + /** + * Returns the actual levelDB object. + * @return DB handle. + */ + public DB getDB() { + return db; + } + + /** + * Returns an iterator on all the key-value pairs in the DB. + * @return an iterator on DB entries. + */ + public DBIterator getIterator() { + return db.iterator(); + } + + + @Override + public void destroy() throws IOException { + close(); + JniDBFactory.factory.destroy(dbFile, dbOptions); + } + + @Override + public ImmutablePair<byte[], byte[]> peekAround(int offset, + byte[] from) throws IOException, IllegalArgumentException { + try (DBIterator it = db.iterator()) { + if (from == null) { + it.seekToFirst(); + } else { + it.seek(from); + } + if (!it.hasNext()) { + return null; + } + switch (offset) { + case 0: + Entry<byte[], byte[]> current = it.next(); + return new ImmutablePair<>(current.getKey(), current.getValue()); + case 1: + if (it.next() != null && it.hasNext()) { + Entry<byte[], byte[]> next = it.peekNext(); + return new ImmutablePair<>(next.getKey(), next.getValue()); + } + break; + case -1: + if (it.hasPrev()) { + Entry<byte[], byte[]> prev = it.peekPrev(); + return new ImmutablePair<>(prev.getKey(), prev.getValue()); + } + break; + default: + throw new IllegalArgumentException( + "Position can only be -1, 0 " + "or 1, but found " + offset); + } + } + return null; + } + + @Override + public void iterate(byte[] from, EntryConsumer consumer) + throws IOException { + try (DBIterator iter = db.iterator()) { + if (from != null) { + iter.seek(from); + } else { + iter.seekToFirst(); + } + while (iter.hasNext()) { + Entry<byte[], byte[]> current = iter.next(); + if (!consumer.consume(current.getKey(), + current.getValue())) { + break; + } + } + } + } + + /** + * Compacts the DB by removing deleted keys etc. + * @throws IOException if there is an error. + */ + @Override + public void compactDB() throws IOException { + if(db != null) { + // From LevelDB docs : begin == null and end == null means the whole DB. + db.compactRange(null, null); + } + } + + @Override + public void writeBatch(BatchOperation operation) throws IOException { + List<BatchOperation.SingleOperation> operations = + operation.getOperations(); + if (!operations.isEmpty()) { + try (WriteBatch writeBatch = db.createWriteBatch()) { + for (BatchOperation.SingleOperation opt : operations) { + switch (opt.getOpt()) { + case DELETE: + writeBatch.delete(opt.getKey()); + break; + case PUT: + writeBatch.put(opt.getKey(), opt.getValue()); + break; + default: + throw new IllegalArgumentException("Invalid operation " + + opt.getOpt()); + } + } + db.write(writeBatch); + } + } + } + + @Override + public List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey, + int count, MetadataKeyFilters.MetadataKeyFilter... filters) + throws IOException, IllegalArgumentException { + return getRangeKVs(startKey, count, false, filters); + } + + @Override + public List<Map.Entry<byte[], byte[]>> getSequentialRangeKVs(byte[] startKey, + int count, MetadataKeyFilters.MetadataKeyFilter... filters) + throws IOException, IllegalArgumentException { + return getRangeKVs(startKey, count, true, filters); + } + + /** + * Returns a certain range of key value pairs as a list based on a + * startKey or count. Further a {@link MetadataKeyFilter} can be added to + * filter keys if necessary. To prevent race conditions while listing + * entries, this implementation takes a snapshot and lists the entries from + * the snapshot. This may, on the other hand, cause the range result slight + * different with actual data if data is updating concurrently. + * <p> + * If the startKey is specified and found in levelDB, this key and the keys + * after this key will be included in the result. If the startKey is null + * all entries will be included as long as other conditions are satisfied. + * If the given startKey doesn't exist, an empty list will be returned. + * <p> + * The count argument is to limit number of total entries to return, + * the value for count must be an integer greater than 0. + * <p> + * This method allows to specify one or more {@link MetadataKeyFilter} + * to filter keys by certain condition. Once given, only the entries + * whose key passes all the filters will be included in the result. + * + * @param startKey a start key. + * @param count max number of entries to return. + * @param filters customized one or more {@link MetadataKeyFilter}. + * @return a list of entries found in the database or an empty list if the + * startKey is invalid. + * @throws IOException if there are I/O errors. + * @throws IllegalArgumentException if count is less than 0. + */ + private List<Entry<byte[], byte[]>> getRangeKVs(byte[] startKey, + int count, boolean sequential, MetadataKeyFilter... filters) + throws IOException { + List<Entry<byte[], byte[]>> result = new ArrayList<>(); + long start = System.currentTimeMillis(); + if (count < 0) { + throw new IllegalArgumentException( + "Invalid count given " + count + ", count must be greater than 0"); + } + Snapshot snapShot = null; + DBIterator dbIter = null; + try { + snapShot = db.getSnapshot(); + ReadOptions readOptions = new ReadOptions().snapshot(snapShot); + dbIter = db.iterator(readOptions); + if (startKey == null) { + dbIter.seekToFirst(); + } else { + if (db.get(startKey) == null) { + // Key not found, return empty list + return result; + } + dbIter.seek(startKey); + } + while (dbIter.hasNext() && result.size() < count) { + byte[] preKey = dbIter.hasPrev() ? dbIter.peekPrev().getKey() : null; + byte[] nextKey = dbIter.hasNext() ? dbIter.peekNext().getKey() : null; + Entry<byte[], byte[]> current = dbIter.next(); + + if (filters == null) { + result.add(current); + } else { + if (Arrays.asList(filters).stream().allMatch( + entry -> entry.filterKey(preKey, current.getKey(), nextKey))) { + result.add(current); + } else { + if (result.size() > 0 && sequential) { + // if the caller asks for a sequential range of results, + // and we met a dis-match, abort iteration from here. + // if result is empty, we continue to look for the first match. + break; + } + } + } + } + } finally { + if (snapShot != null) { + snapShot.close(); + } + if (dbIter != null) { + dbIter.close(); + } + if (LOG.isDebugEnabled()) { + if (filters != null) { + for (MetadataKeyFilters.MetadataKeyFilter filter : filters) { + int scanned = filter.getKeysScannedNum(); + int hinted = filter.getKeysHintedNum(); + if (scanned > 0 || hinted > 0) { + LOG.debug( + "getRangeKVs ({}) numOfKeysScanned={}, numOfKeysHinted={}", + filter.getClass().getSimpleName(), filter.getKeysScannedNum(), + filter.getKeysHintedNum()); + } + } + } + long end = System.currentTimeMillis(); + long timeConsumed = end - start; + LOG.debug("Time consumed for getRangeKVs() is {}ms," + + " result length is {}.", timeConsumed, result.size()); + } + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java new file mode 100644 index 0000000..3ff0a94 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.utils; + +import com.google.common.base.Strings; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.ozone.OzoneConsts; + +/** + * An utility class to filter levelDB keys. + */ +public final class MetadataKeyFilters { + + private static KeyPrefixFilter deletingKeyFilter = + new MetadataKeyFilters.KeyPrefixFilter(OzoneConsts.DELETING_KEY_PREFIX); + + private static KeyPrefixFilter normalKeyFilter = + new MetadataKeyFilters.KeyPrefixFilter(OzoneConsts.DELETING_KEY_PREFIX, + true); + + private MetadataKeyFilters() { + } + + public static KeyPrefixFilter getDeletingKeyFilter() { + return deletingKeyFilter; + } + + public static KeyPrefixFilter getNormalKeyFilter() { + return normalKeyFilter; + } + /** + * Interface for levelDB key filters. + */ + public interface MetadataKeyFilter { + /** + * Filter levelDB key with a certain condition. + * + * @param preKey previous key. + * @param currentKey current key. + * @param nextKey next key. + * @return true if a certain condition satisfied, return false otherwise. + */ + boolean filterKey(byte[] preKey, byte[] currentKey, byte[] nextKey); + + default int getKeysScannedNum() { + return 0; + } + + default int getKeysHintedNum() { + return 0; + } + } + + /** + * Utility class to filter key by a string prefix. This filter + * assumes keys can be parsed to a string. + */ + public static class KeyPrefixFilter implements MetadataKeyFilter { + + private String keyPrefix = null; + private int keysScanned = 0; + private int keysHinted = 0; + private Boolean negative; + + public KeyPrefixFilter(String keyPrefix) { + this(keyPrefix, false); + } + + public KeyPrefixFilter(String keyPrefix, boolean negative) { + this.keyPrefix = keyPrefix; + this.negative = negative; + } + + @Override + public boolean filterKey(byte[] preKey, byte[] currentKey, + byte[] nextKey) { + keysScanned++; + boolean accept = false; + if (Strings.isNullOrEmpty(keyPrefix)) { + accept = true; + } else { + if (currentKey != null && + DFSUtil.bytes2String(currentKey).startsWith(keyPrefix)) { + keysHinted++; + accept = true; + } else { + accept = false; + } + } + return (negative) ? !accept : accept; + } + + @Override + public int getKeysScannedNum() { + return keysScanned; + } + + @Override + public int getKeysHintedNum() { + return keysHinted; + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
