http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java deleted file mode 100644 index 56aab0a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import static org.apache.hadoop.util.Time.now; - -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Service to periodically check if the {@link org.apache.hadoop.hdfs.server. - * federation.store.StateStoreService StateStoreService} cached information in - * the {@link Router} is up to date. This is for performance and removes the - * {@link org.apache.hadoop.hdfs.server.federation.store.StateStoreService - * StateStoreService} from the critical path in common operations. - */ -public class RouterSafemodeService extends PeriodicService { - - private static final Logger LOG = - LoggerFactory.getLogger(RouterSafemodeService.class); - - /** Router to manage safe mode. */ - private final Router router; - - /** Interval in ms to wait post startup before allowing RPC requests. */ - private long startupInterval; - /** Interval in ms after which the State Store cache is too stale. */ - private long staleInterval; - /** Start time in ms of this service. */ - private long startupTime; - - /** The time the Router enters safe mode in milliseconds. */ - private long enterSafeModeTime = now(); - - - /** - * Create a new Cache update service. - * - * @param router Router containing the cache. - */ - public RouterSafemodeService(Router router) { - super(RouterSafemodeService.class.getSimpleName()); - this.router = router; - } - - /** - * Enter safe mode. - */ - private void enter() { - LOG.info("Entering safe mode"); - enterSafeModeTime = now(); - RouterRpcServer rpcServer = router.getRpcServer(); - rpcServer.setSafeMode(true); - router.updateRouterState(RouterServiceState.SAFEMODE); - } - - /** - * Leave safe mode. - */ - private void leave() { - // Cache recently updated, leave safemode - long timeInSafemode = now() - enterSafeModeTime; - LOG.info("Leaving safe mode after {} milliseconds", timeInSafemode); - RouterMetrics routerMetrics = router.getRouterMetrics(); - if (routerMetrics == null) { - LOG.error("The Router metrics are not enabled"); - } else { - routerMetrics.setSafeModeTime(timeInSafemode); - } - RouterRpcServer rpcServer = router.getRpcServer(); - rpcServer.setSafeMode(false); - router.updateRouterState(RouterServiceState.RUNNING); - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - - // Use same interval as cache update service - this.setIntervalMs(conf.getTimeDuration( - DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, - DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT, - TimeUnit.MILLISECONDS)); - - this.startupInterval = conf.getTimeDuration( - DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXTENSION, - DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXTENSION_DEFAULT, - TimeUnit.MILLISECONDS); - LOG.info("Leave startup safe mode after {} ms", this.startupInterval); - - this.staleInterval = conf.getTimeDuration( - DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXPIRATION, - DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXPIRATION_DEFAULT, - TimeUnit.MILLISECONDS); - LOG.info("Enter safe mode after {} ms without reaching the State Store", - this.staleInterval); - - this.startupTime = Time.now(); - - // Initializing the RPC server in safe mode, it will disable it later - enter(); - - super.serviceInit(conf); - } - - @Override - public void periodicInvoke() { - long now = Time.now(); - long delta = now - startupTime; - if (delta < startupInterval) { - LOG.info("Delaying safemode exit for {} milliseconds...", - this.startupInterval - delta); - return; - } - RouterRpcServer rpcServer = router.getRpcServer(); - StateStoreService stateStore = router.getStateStore(); - long cacheUpdateTime = stateStore.getCacheUpdateTime(); - boolean isCacheStale = (now - cacheUpdateTime) > this.staleInterval; - - // Always update to indicate our cache was updated - if (isCacheStale) { - if (!rpcServer.isInSafeMode()) { - enter(); - } - } else if (rpcServer.isInSafeMode()) { - // Cache recently updated, leave safe mode - leave(); - } - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java deleted file mode 100644 index 3accbe9..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -/** - * States of the Router. - */ -public enum RouterServiceState { - UNINITIALIZED, - INITIALIZING, - SAFEMODE, - RUNNING, - STOPPING, - SHUTDOWN, - EXPIRED; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateManager.java deleted file mode 100644 index 527600c..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateManager.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import java.io.IOException; - -import org.apache.hadoop.hdfs.server.federation.store.protocol.EnterSafeModeRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.EnterSafeModeResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeResponse; - -/** - * Interface of managing the Router state. - */ -public interface RouterStateManager { - /** - * Enter safe mode and change Router state to RouterServiceState#SAFEMODE. - */ - EnterSafeModeResponse enterSafeMode(EnterSafeModeRequest request) - throws IOException; - - /** - * Leave safe mode and change Router state to RouterServiceState#RUNNING. - */ - LeaveSafeModeResponse leaveSafeMode(LeaveSafeModeRequest request) - throws IOException; - - /** - * Verify if current Router state is safe mode. - */ - GetSafeModeResponse getSafeMode(GetSafeModeRequest request) - throws IOException; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/package-info.java deleted file mode 100644 index 327f39b..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/package-info.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * The router package includes the core services for a HDFS federation router. - * The {@link Router} acts as a transparent proxy in front of a cluster of - * multiple NameNodes and nameservices. The {@link RouterRpcServer} exposes the - * NameNode clientProtocol and is the primary contact point for DFS clients in a - * federated cluster. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -package org.apache.hadoop.hdfs.server.federation.router; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java deleted file mode 100644 index cdd4449..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java +++ /dev/null @@ -1,242 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; -import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; -import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; -import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Record store that takes care of caching the records in memory. - * - * @param <R> Record to store by this interface. - */ -public abstract class CachedRecordStore<R extends BaseRecord> - extends RecordStore<R> implements StateStoreCache { - - private static final Logger LOG = - LoggerFactory.getLogger(CachedRecordStore.class); - - - /** Prevent loading the cache more than once every 500 ms. */ - private static final long MIN_UPDATE_MS = 500; - - - /** Cached entries. */ - private List<R> records = new ArrayList<>(); - - /** Time stamp of the cached entries. */ - private long timestamp = -1; - - /** If the cache is initialized. */ - private boolean initialized = false; - - /** Last time the cache was updated. */ - private long lastUpdate = -1; - - /** Lock to access the memory cache. */ - private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - private final Lock readLock = readWriteLock.readLock(); - private final Lock writeLock = readWriteLock.writeLock(); - - /** If it should override the expired values when loading the cache. */ - private boolean override = false; - - - /** - * Create a new cached record store. - * - * @param clazz Class of the record to store. - * @param driver State Store driver. - */ - protected CachedRecordStore(Class<R> clazz, StateStoreDriver driver) { - this(clazz, driver, false); - } - - /** - * Create a new cached record store. - * - * @param clazz Class of the record to store. - * @param driver State Store driver. - * @param over If the entries should be override if they expire - */ - protected CachedRecordStore( - Class<R> clazz, StateStoreDriver driver, boolean over) { - super(clazz, driver); - - this.override = over; - } - - /** - * Check that the cache of the State Store information is available. - * - * @throws StateStoreUnavailableException If the cache is not initialized. - */ - private void checkCacheAvailable() throws StateStoreUnavailableException { - if (!this.initialized) { - throw new StateStoreUnavailableException( - "Cached State Store not initialized, " + - getRecordClass().getSimpleName() + " records not valid"); - } - } - - @Override - public boolean loadCache(boolean force) throws IOException { - // Prevent loading the cache too frequently - if (force || isUpdateTime()) { - List<R> newRecords = null; - long t = -1; - try { - QueryResult<R> result = getDriver().get(getRecordClass()); - newRecords = result.getRecords(); - t = result.getTimestamp(); - - // If we have any expired record, update the State Store - if (this.override) { - overrideExpiredRecords(result); - } - } catch (IOException e) { - LOG.error("Cannot get \"{}\" records from the State Store", - getRecordClass().getSimpleName()); - this.initialized = false; - return false; - } - - // Update cache atomically - writeLock.lock(); - try { - this.records.clear(); - this.records.addAll(newRecords); - this.timestamp = t; - this.initialized = true; - } finally { - writeLock.unlock(); - } - - // Update the metrics for the cache State Store size - StateStoreMetrics metrics = getDriver().getMetrics(); - if (metrics != null) { - String recordName = getRecordClass().getSimpleName(); - metrics.setCacheSize(recordName, this.records.size()); - } - - lastUpdate = Time.monotonicNow(); - } - return true; - } - - /** - * Check if it's time to update the cache. Update it it was never updated. - * - * @return If it's time to update this cache. - */ - private boolean isUpdateTime() { - return Time.monotonicNow() - lastUpdate > MIN_UPDATE_MS; - } - - /** - * Updates the state store with any record overrides we detected, such as an - * expired state. - * - * @param query RecordQueryResult containing the data to be inspected. - * @throws IOException - */ - public void overrideExpiredRecords(QueryResult<R> query) throws IOException { - List<R> commitRecords = new ArrayList<>(); - List<R> newRecords = query.getRecords(); - long currentDriverTime = query.getTimestamp(); - if (newRecords == null || currentDriverTime <= 0) { - LOG.error("Cannot check overrides for record"); - return; - } - for (R record : newRecords) { - if (record.checkExpired(currentDriverTime)) { - String recordName = StateStoreUtils.getRecordName(record.getClass()); - LOG.info("Override State Store record {}: {}", recordName, record); - commitRecords.add(record); - } - } - if (commitRecords.size() > 0) { - getDriver().putAll(commitRecords, true, false); - } - } - - /** - * Updates the state store with any record overrides we detected, such as an - * expired state. - * - * @param record Record record to be updated. - * @throws IOException - */ - public void overrideExpiredRecord(R record) throws IOException { - List<R> newRecords = Collections.singletonList(record); - long time = getDriver().getTime(); - QueryResult<R> query = new QueryResult<>(newRecords, time); - overrideExpiredRecords(query); - } - - /** - * Get all the cached records. - * - * @return Copy of the cached records. - * @throws StateStoreUnavailableException If the State store is not available. - */ - public List<R> getCachedRecords() throws StateStoreUnavailableException { - checkCacheAvailable(); - - List<R> ret = new LinkedList<R>(); - this.readLock.lock(); - try { - ret.addAll(this.records); - } finally { - this.readLock.unlock(); - } - return ret; - } - - /** - * Get all the cached records and the time stamp of the cache. - * - * @return Copy of the cached records and the time stamp. - * @throws StateStoreUnavailableException If the State store is not available. - */ - protected QueryResult<R> getCachedRecordsAndTimeStamp() - throws StateStoreUnavailableException { - checkCacheAvailable(); - - this.readLock.lock(); - try { - return new QueryResult<R>(this.records, this.timestamp); - } finally { - this.readLock.unlock(); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java deleted file mode 100644 index 3e8ba6b..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationResponse; -import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; - -/** - * Management API for NameNode registrations stored in - * {@link org.apache.hadoop.hdfs.server.federation.store.records.MembershipState - * MembershipState} records. The {@link org.apache.hadoop.hdfs.server. - * federation.router.RouterHeartbeatService RouterHeartbeatService} periodically - * polls each NN to update the NameNode metadata(addresses, operational) and HA - * state(active, standby). Each NameNode may be polled by multiple - * {@link org.apache.hadoop.hdfs.server.federation.router.Router Router} - * instances. - * <p> - * Once fetched from the - * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver - * StateStoreDriver}, NameNode registrations are cached until the next query. - * The fetched registration data is aggregated using a quorum to determine the - * best/most accurate state for each NameNode. The cache is periodically updated - * by the @{link StateStoreCacheUpdateService}. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public abstract class MembershipStore - extends CachedRecordStore<MembershipState> { - - protected MembershipStore(StateStoreDriver driver) { - super(MembershipState.class, driver, true); - } - - /** - * Inserts or updates a namenode membership entry into the table. - * - * @param request Fully populated NamenodeHeartbeatRequest request. - * @return True if successful, false otherwise. - * @throws StateStoreUnavailableException Throws exception if the data store - * is not initialized. - * @throws IOException if the data store could not be queried or the query is - * invalid. - */ - public abstract NamenodeHeartbeatResponse namenodeHeartbeat( - NamenodeHeartbeatRequest request) throws IOException; - - /** - * Queries for a single cached registration entry matching the given - * parameters. Possible keys are the names of data structure elements Possible - * values are matching SQL "LIKE" targets. - * - * @param request Fully populated GetNamenodeRegistrationsRequest request. - * @return Single matching FederationMembershipStateEntry or null if not found - * or more than one entry matches. - * @throws StateStoreUnavailableException Throws exception if the data store - * is not initialized. - * @throws IOException if the data store could not be queried or the query is - * invalid. - */ - public abstract GetNamenodeRegistrationsResponse getNamenodeRegistrations( - GetNamenodeRegistrationsRequest request) throws IOException; - - /** - * Get the expired registrations from the registration cache. - * - * @return Expired registrations or zero-length list if none are found. - * @throws StateStoreUnavailableException Throws exception if the data store - * is not initialized. - * @throws IOException if the data store could not be queried or the query is - * invalid. - */ - public abstract GetNamenodeRegistrationsResponse - getExpiredNamenodeRegistrations(GetNamenodeRegistrationsRequest request) - throws IOException; - - /** - * Retrieves a list of registered nameservices and their associated info. - * - * @param request - * @return Collection of information for each registered nameservice. - * @throws IOException if the data store could not be queried or the query is - * invalid. - */ - public abstract GetNamespaceInfoResponse getNamespaceInfo( - GetNamespaceInfoRequest request) throws IOException; - - /** - * Overrides a cached namenode state with an updated state. - * - * @param request Fully populated OverrideNamenodeRegistrationRequest request. - * @return OverrideNamenodeRegistrationResponse - * @throws StateStoreUnavailableException if the data store is not - * initialized. - * @throws IOException if the data store could not be queried or the query is - * invalid. - */ - public abstract UpdateNamenodeRegistrationResponse updateNamenodeRegistration( - UpdateNamenodeRegistrationRequest request) throws IOException; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java deleted file mode 100644 index b439659..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; -import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; -import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; - -/** - * Management API for the HDFS mount table information stored in - * {@link org.apache.hadoop.hdfs.server.federation.store.records.MountTable - * MountTable} records. The mount table contains entries that map a particular - * global namespace path one or more HDFS nameservices (NN) + target path. It is - * possible to map mount locations for root folders, directories or individual - * files. - * <p> - * Once fetched from the - * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver - * StateStoreDriver}, MountTable records are cached in a tree for faster access. - * Each path in the global namespace is mapped to a nameserivce ID and local - * path upon request. The cache is periodically updated by the @{link - * StateStoreCacheUpdateService}. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public abstract class MountTableStore extends CachedRecordStore<MountTable> - implements MountTableManager { - - public MountTableStore(StateStoreDriver driver) { - super(MountTable.class, driver); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java deleted file mode 100644 index 53a8b82..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store; - -import java.lang.reflect.Constructor; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; -import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; - -/** - * Store records in the State Store. Subclasses provide interfaces to operate on - * those records. - * - * @param <R> Record to store by this interface. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public abstract class RecordStore<R extends BaseRecord> { - - private static final Log LOG = LogFactory.getLog(RecordStore.class); - - - /** Class of the record stored in this State Store. */ - private final Class<R> recordClass; - - /** State store driver backed by persistent storage. */ - private final StateStoreDriver driver; - - - /** - * Create a new store for records. - * - * @param clazz Class of the record to store. - * @param stateStoreDriver Driver for the State Store. - */ - protected RecordStore(Class<R> clazz, StateStoreDriver stateStoreDriver) { - this.recordClass = clazz; - this.driver = stateStoreDriver; - } - - /** - * Report a required record to the data store. The data store uses this to - * create/maintain storage for the record. - * - * @return The class of the required record or null if no record is required - * for this interface. - */ - public Class<R> getRecordClass() { - return this.recordClass; - } - - /** - * Get the State Store driver. - * - * @return State Store driver. - */ - protected StateStoreDriver getDriver() { - return this.driver; - } - - /** - * Build a state store API implementation interface. - * - * @param clazz The specific interface implementation to create - * @param driver The {@link StateStoreDriver} implementation in use. - * @return An initialized instance of the specified state store API - * implementation. - */ - public static <T extends RecordStore<?>> T newInstance( - final Class<T> clazz, final StateStoreDriver driver) { - - try { - Constructor<T> constructor = clazz.getConstructor(StateStoreDriver.class); - T recordStore = constructor.newInstance(driver); - return recordStore; - } catch (Exception e) { - LOG.error("Cannot create new instance for " + clazz, e); - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RouterStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RouterStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RouterStore.java deleted file mode 100644 index c6a0dad..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RouterStore.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse; -import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; - -/** - * Management API for - * {@link org.apache.hadoop.hdfs.server.federation.store.records.RouterState - * RouterState} records in the state store. Accesses the data store via the - * {@link org.apache.hadoop.hdfs.server.federation.store.driver. - * StateStoreDriver StateStoreDriver} interface. No data is cached. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public abstract class RouterStore extends CachedRecordStore<RouterState> { - - public RouterStore(StateStoreDriver driver) { - super(RouterState.class, driver, true); - } - - /** - * Fetches the current router state object. - * - * @param request Fully populated request object. - * @return The matching router record or null if none exists. - * @throws IOException Throws exception if unable to query the data store or - * if more than one matching record is found. - */ - public abstract GetRouterRegistrationResponse getRouterRegistration( - GetRouterRegistrationRequest request) throws IOException; - - /** - * Fetches all router status objects. - * - * @param request Fully populated request object. - * @return List of Router records present in the data store. - * @throws IOException Throws exception if unable to query the data store - */ - public abstract GetRouterRegistrationsResponse getRouterRegistrations( - GetRouterRegistrationsRequest request) throws IOException; - - /** - * Update the state of this router in the State Store. - * - * @param request Fully populated request object. - * @return True if the update was successfully recorded, false otherwise. - * @throws IOException Throws exception if unable to query the data store - */ - public abstract RouterHeartbeatResponse routerHeartbeat( - RouterHeartbeatRequest request) throws IOException; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java deleted file mode 100644 index 83fc501..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store; - -import java.io.IOException; - -/** - * Interface for a cached copy of the State Store. - */ -public interface StateStoreCache { - - /** - * Load the cache from the State Store. Called by the cache update service - * when the data has been reloaded. - * - * @param force If we force the load. - * @return If the cache was loaded successfully. - * @throws IOException If there was an error loading the cache. - */ - boolean loadCache(boolean force) throws IOException; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java deleted file mode 100644 index 9bcbc1e..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store; - -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.federation.router.PeriodicService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Service to periodically update the {@link StateStoreService} - * cached information in the - * {@link org.apache.hadoop.hdfs.server.federation.router.Router Router}. - * This is for performance and removes the State Store from the critical path - * in common operations. - */ -public class StateStoreCacheUpdateService extends PeriodicService { - - private static final Logger LOG = - LoggerFactory.getLogger(StateStoreCacheUpdateService.class); - - /** The service that manages the State Store connection. */ - private final StateStoreService stateStore; - - - /** - * Create a new Cache update service. - * - * @param stateStore Implementation of the state store - */ - public StateStoreCacheUpdateService(StateStoreService stateStore) { - super(StateStoreCacheUpdateService.class.getSimpleName()); - this.stateStore = stateStore; - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - - this.setIntervalMs(conf.getTimeDuration( - DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, - DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT, - TimeUnit.MILLISECONDS)); - - super.serviceInit(conf); - } - - @Override - public void periodicInvoke() { - LOG.debug("Updating State Store cache"); - stateStore.refreshCaches(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java deleted file mode 100644 index 4d279c5..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.federation.router.PeriodicService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Service to periodically monitor the connection of the StateStore - * {@link StateStoreService} data store and to re-open the connection - * to the data store if required. - */ -public class StateStoreConnectionMonitorService extends PeriodicService { - - private static final Logger LOG = - LoggerFactory.getLogger(StateStoreConnectionMonitorService.class); - - /** Service that maintains the State Store connection. */ - private final StateStoreService stateStore; - - - /** - * Create a new service to monitor the connectivity of the state store driver. - * - * @param store Instance of the state store to be monitored. - */ - public StateStoreConnectionMonitorService(StateStoreService store) { - super(StateStoreConnectionMonitorService.class.getSimpleName()); - this.stateStore = store; - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - this.setIntervalMs(conf.getLong( - DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS, - DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS_DEFAULT)); - - super.serviceInit(conf); - } - - @Override - public void periodicInvoke() { - LOG.debug("Checking state store connection"); - if (!stateStore.isDriverReady()) { - LOG.info("Attempting to open state store driver."); - stateStore.loadDriver(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java deleted file mode 100644 index aa730ae..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java +++ /dev/null @@ -1,450 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import javax.management.NotCompliantMBeanException; -import javax.management.ObjectName; -import javax.management.StandardMBean; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMBean; -import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; -import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; -import org.apache.hadoop.hdfs.server.federation.store.impl.MembershipStoreImpl; -import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl; -import org.apache.hadoop.hdfs.server.federation.store.impl.RouterStoreImpl; -import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; -import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; -import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; -import org.apache.hadoop.metrics2.MetricsException; -import org.apache.hadoop.metrics2.util.MBeans; -import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; - -/** - * A service to initialize a - * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver - * StateStoreDriver} and maintain the connection to the data store. There are - * multiple state store driver connections supported: - * <ul> - * <li>File - * {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl. - * StateStoreFileImpl StateStoreFileImpl} - * <li>ZooKeeper - * {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl. - * StateStoreZooKeeperImpl StateStoreZooKeeperImpl} - * </ul> - * <p> - * The service also supports the dynamic registration of record stores like: - * <ul> - * <li>{@link MembershipStore}: state of the Namenodes in the - * federation. - * <li>{@link MountTableStore}: Mount table between to subclusters. - * See {@link org.apache.hadoop.fs.viewfs.ViewFs ViewFs}. - * <li>{@link RebalancerStore}: Log of the rebalancing operations. - * <li>{@link RouterStore}: Router state in the federation. - * <li>{@link TokenStore}: Tokens in the federation. - * </ul> - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class StateStoreService extends CompositeService { - - private static final Logger LOG = - LoggerFactory.getLogger(StateStoreService.class); - - - /** State Store configuration. */ - private Configuration conf; - - /** Identifier for the service. */ - private String identifier; - - /** Driver for the back end connection. */ - private StateStoreDriver driver; - - /** Service to maintain data store connection. */ - private StateStoreConnectionMonitorService monitorService; - - /** StateStore metrics. */ - private StateStoreMetrics metrics; - - /** Supported record stores. */ - private final Map< - Class<? extends BaseRecord>, RecordStore<? extends BaseRecord>> - recordStores; - - /** Service to maintain State Store caches. */ - private StateStoreCacheUpdateService cacheUpdater; - /** Time the cache was last successfully updated. */ - private long cacheLastUpdateTime; - /** List of internal caches to update. */ - private final List<StateStoreCache> cachesToUpdateInternal; - /** List of external caches to update. */ - private final List<StateStoreCache> cachesToUpdateExternal; - - - public StateStoreService() { - super(StateStoreService.class.getName()); - - // Records and stores supported by this implementation - this.recordStores = new HashMap<>(); - - // Caches to maintain - this.cachesToUpdateInternal = new ArrayList<>(); - this.cachesToUpdateExternal = new ArrayList<>(); - } - - /** - * Initialize the State Store and the connection to the backend. - * - * @param config Configuration for the State Store. - * @throws IOException - */ - @Override - protected void serviceInit(Configuration config) throws Exception { - this.conf = config; - - // Create implementation of State Store - Class<? extends StateStoreDriver> driverClass = this.conf.getClass( - DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS, - DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS_DEFAULT, - StateStoreDriver.class); - this.driver = ReflectionUtils.newInstance(driverClass, this.conf); - - if (this.driver == null) { - throw new IOException("Cannot create driver for the State Store"); - } - - // Add supported record stores - addRecordStore(MembershipStoreImpl.class); - addRecordStore(MountTableStoreImpl.class); - addRecordStore(RouterStoreImpl.class); - - // Check the connection to the State Store periodically - this.monitorService = new StateStoreConnectionMonitorService(this); - this.addService(monitorService); - - // Set expirations intervals for each record - MembershipState.setExpirationMs(conf.getLong( - DFSConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS, - DFSConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT)); - - RouterState.setExpirationMs(conf.getTimeDuration( - DFSConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS, - DFSConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT, - TimeUnit.MILLISECONDS)); - - // Cache update service - this.cacheUpdater = new StateStoreCacheUpdateService(this); - addService(this.cacheUpdater); - - // Create metrics for the State Store - this.metrics = StateStoreMetrics.create(conf); - - // Adding JMX interface - try { - StandardMBean bean = new StandardMBean(metrics, StateStoreMBean.class); - ObjectName registeredObject = - MBeans.register("Router", "StateStore", bean); - LOG.info("Registered StateStoreMBean: {}", registeredObject); - } catch (NotCompliantMBeanException e) { - throw new RuntimeException("Bad StateStoreMBean setup", e); - } catch (MetricsException e) { - LOG.info("Failed to register State Store bean {}", e.getMessage()); - } - - super.serviceInit(this.conf); - } - - @Override - protected void serviceStart() throws Exception { - loadDriver(); - super.serviceStart(); - } - - @Override - protected void serviceStop() throws Exception { - closeDriver(); - - if (metrics != null) { - metrics.shutdown(); - metrics = null; - } - - super.serviceStop(); - } - - /** - * Add a record store to the State Store. It includes adding the store, the - * supported record and the cache management. - * - * @param clazz Class of the record store to track. - * @return New record store. - * @throws ReflectiveOperationException - */ - private <T extends RecordStore<?>> void addRecordStore( - final Class<T> clazz) throws ReflectiveOperationException { - - assert this.getServiceState() == STATE.INITED : - "Cannot add record to the State Store once started"; - - T recordStore = RecordStore.newInstance(clazz, this.getDriver()); - Class<? extends BaseRecord> recordClass = recordStore.getRecordClass(); - this.recordStores.put(recordClass, recordStore); - - // Subscribe for cache updates - if (recordStore instanceof StateStoreCache) { - StateStoreCache cachedRecordStore = (StateStoreCache) recordStore; - this.cachesToUpdateInternal.add(cachedRecordStore); - } - } - - /** - * Get the record store in this State Store for a given interface. - * - * @param recordStoreClass Class of the record store. - * @return Registered record store or null if not found. - */ - public <T extends RecordStore<?>> T getRegisteredRecordStore( - final Class<T> recordStoreClass) { - for (RecordStore<? extends BaseRecord> recordStore : - this.recordStores.values()) { - if (recordStoreClass.isInstance(recordStore)) { - @SuppressWarnings("unchecked") - T recordStoreChecked = (T) recordStore; - return recordStoreChecked; - } - } - return null; - } - - /** - * List of records supported by this State Store. - * - * @return List of supported record classes. - */ - public Collection<Class<? extends BaseRecord>> getSupportedRecords() { - return this.recordStores.keySet(); - } - - /** - * Load the State Store driver. If successful, refresh cached data tables. - */ - public void loadDriver() { - synchronized (this.driver) { - if (!isDriverReady()) { - String driverName = this.driver.getClass().getSimpleName(); - if (this.driver.init( - conf, getIdentifier(), getSupportedRecords(), metrics)) { - LOG.info("Connection to the State Store driver {} is open and ready", - driverName); - this.refreshCaches(); - } else { - LOG.error("Cannot initialize State Store driver {}", driverName); - } - } - } - } - - /** - * Check if the driver is ready to be used. - * - * @return If the driver is ready. - */ - public boolean isDriverReady() { - return this.driver.isDriverReady(); - } - - /** - * Manually shuts down the driver. - * - * @throws Exception If the driver cannot be closed. - */ - @VisibleForTesting - public void closeDriver() throws Exception { - if (this.driver != null) { - this.driver.close(); - } - } - - /** - * Get the state store driver. - * - * @return State store driver. - */ - public StateStoreDriver getDriver() { - return this.driver; - } - - /** - * Fetch a unique identifier for this state store instance. Typically it is - * the address of the router. - * - * @return Unique identifier for this store. - */ - public String getIdentifier() { - return this.identifier; - } - - /** - * Set a unique synchronization identifier for this store. - * - * @param id Unique identifier, typically the router's RPC address. - */ - public void setIdentifier(String id) { - this.identifier = id; - } - - // - // Cached state store data - // - /** - * The last time the state store cache was fully updated. - * - * @return Timestamp. - */ - public long getCacheUpdateTime() { - return this.cacheLastUpdateTime; - } - - /** - * Stops the cache update service. - */ - @VisibleForTesting - public void stopCacheUpdateService() { - if (this.cacheUpdater != null) { - this.cacheUpdater.stop(); - removeService(this.cacheUpdater); - this.cacheUpdater = null; - } - } - - /** - * Register a cached record store for automatic periodic cache updates. - * - * @param client Client to the state store. - */ - public void registerCacheExternal(StateStoreCache client) { - this.cachesToUpdateExternal.add(client); - } - - /** - * Refresh the cache with information from the State Store. Called - * periodically by the CacheUpdateService to maintain data caches and - * versions. - */ - public void refreshCaches() { - refreshCaches(false); - } - - /** - * Refresh the cache with information from the State Store. Called - * periodically by the CacheUpdateService to maintain data caches and - * versions. - * @param force If we force the refresh. - */ - public void refreshCaches(boolean force) { - boolean success = true; - if (isDriverReady()) { - List<StateStoreCache> cachesToUpdate = new LinkedList<>(); - cachesToUpdate.addAll(cachesToUpdateInternal); - cachesToUpdate.addAll(cachesToUpdateExternal); - for (StateStoreCache cachedStore : cachesToUpdate) { - String cacheName = cachedStore.getClass().getSimpleName(); - boolean result = false; - try { - result = cachedStore.loadCache(force); - } catch (IOException e) { - LOG.error("Error updating cache for {}", cacheName, e); - result = false; - } - if (!result) { - success = false; - LOG.error("Cache update failed for cache {}", cacheName); - } - } - } else { - success = false; - LOG.info("Skipping State Store cache update, driver is not ready."); - } - if (success) { - // Uses local time, not driver time. - this.cacheLastUpdateTime = Time.now(); - } - } - - /** - * Update the cache for a specific record store. - * - * @param clazz Class of the record store. - * @return If the cached was loaded. - * @throws IOException if the cache update failed. - */ - public boolean loadCache(final Class<?> clazz) throws IOException { - return loadCache(clazz, false); - } - - /** - * Update the cache for a specific record store. - * - * @param clazz Class of the record store. - * @param force Force the update ignoring cached periods. - * @return If the cached was loaded. - * @throws IOException if the cache update failed. - */ - public boolean loadCache(Class<?> clazz, boolean force) throws IOException { - List<StateStoreCache> cachesToUpdate = - new LinkedList<StateStoreCache>(); - cachesToUpdate.addAll(this.cachesToUpdateInternal); - cachesToUpdate.addAll(this.cachesToUpdateExternal); - for (StateStoreCache cachedStore : cachesToUpdate) { - if (clazz.isInstance(cachedStore)) { - return cachedStore.loadCache(force); - } - } - throw new IOException("Registered cache was not found for " + clazz); - } - - /** - * Get the metrics for the State Store. - * - * @return State Store metrics. - */ - public StateStoreMetrics getMetrics() { - return metrics; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUnavailableException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUnavailableException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUnavailableException.java deleted file mode 100644 index 4e6f8c8..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUnavailableException.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store; - -import java.io.IOException; - -/** - * Thrown when the state store is not reachable or available. Cached APIs and - * queries may succeed. Client should retry again later. - */ -public class StateStoreUnavailableException extends IOException { - - private static final long serialVersionUID = 1L; - - public StateStoreUnavailableException(String msg) { - super(msg); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java deleted file mode 100644 index 0a36619..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; -import org.apache.hadoop.hdfs.server.federation.store.records.Query; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Set of utility functions used to work with the State Store. - */ -public final class StateStoreUtils { - - private static final Logger LOG = - LoggerFactory.getLogger(StateStoreUtils.class); - - - private StateStoreUtils() { - // Utility class - } - - /** - * Get the base class for a record class. If we get an implementation of a - * record we will return the real parent record class. - * - * @param clazz Class of the data record to check. - * @return Base class for the record. - */ - @SuppressWarnings("unchecked") - public static <T extends BaseRecord> - Class<? extends BaseRecord> getRecordClass(final Class<T> clazz) { - - // We ignore the Impl classes and go to the super class - Class<? extends BaseRecord> actualClazz = clazz; - while (actualClazz.getSimpleName().endsWith("Impl")) { - actualClazz = (Class<? extends BaseRecord>) actualClazz.getSuperclass(); - } - - // Check if we went too far - if (actualClazz.equals(BaseRecord.class)) { - LOG.error("We went too far ({}) with {}", actualClazz, clazz); - actualClazz = clazz; - } - return actualClazz; - } - - /** - * Get the base class for a record. If we get an implementation of a record we - * will return the real parent record class. - * - * @param record Record to check its main class. - * @return Base class for the record. - */ - public static <T extends BaseRecord> - Class<? extends BaseRecord> getRecordClass(final T record) { - return getRecordClass(record.getClass()); - } - - /** - * Get the base class name for a record. If we get an implementation of a - * record we will return the real parent record class. - * - * @param clazz Class of the data record to check. - * @return Name of the base class for the record. - */ - public static <T extends BaseRecord> String getRecordName( - final Class<T> clazz) { - return getRecordClass(clazz).getSimpleName(); - } - - /** - * Filters a list of records to find all records matching the query. - * - * @param query Map of field names and objects to use to filter results. - * @param records List of data records to filter. - * @return List of all records matching the query (or empty list if none - * match), null if the data set could not be filtered. - */ - public static <T extends BaseRecord> List<T> filterMultiple( - final Query<T> query, final Iterable<T> records) { - - List<T> matchingList = new ArrayList<>(); - for (T record : records) { - if (query.matches(record)) { - matchingList.add(record); - } - } - return matchingList; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java deleted file mode 100644 index d595a97..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java +++ /dev/null @@ -1,203 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store.driver; - -import java.net.InetAddress; -import java.util.Collection; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; -import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; -import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; -import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; -import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Driver class for an implementation of a {@link StateStoreService} - * provider. Driver implementations will extend this class and implement some of - * the default methods. - */ -public abstract class StateStoreDriver implements StateStoreRecordOperations { - - private static final Logger LOG = - LoggerFactory.getLogger(StateStoreDriver.class); - - - /** State Store configuration. */ - private Configuration conf; - - /** Identifier for the driver. */ - private String identifier; - - /** State Store metrics. */ - private StateStoreMetrics metrics; - - - /** - * Initialize the state store connection. - * - * @param config Configuration for the driver. - * @param id Identifier for the driver. - * @param records Records that are supported. - * @return If initialized and ready, false if failed to initialize driver. - */ - public boolean init(final Configuration config, final String id, - final Collection<Class<? extends BaseRecord>> records, - final StateStoreMetrics stateStoreMetrics) { - - this.conf = config; - this.identifier = id; - this.metrics = stateStoreMetrics; - - if (this.identifier == null) { - LOG.warn("The identifier for the State Store connection is not set"); - } - - boolean success = initDriver(); - if (!success) { - LOG.error("Cannot initialize driver for {}", getDriverName()); - return false; - } - - for (Class<? extends BaseRecord> cls : records) { - String recordString = StateStoreUtils.getRecordName(cls); - if (!initRecordStorage(recordString, cls)) { - LOG.error("Cannot initialize record store for {}", cls.getSimpleName()); - return false; - } - } - return true; - } - - /** - * Get the State Store configuration. - * - * @return Configuration for the State Store. - */ - protected Configuration getConf() { - return this.conf; - } - - /** - * Gets a unique identifier for the running task/process. Typically the - * router address. - * - * @return Unique identifier for the running task. - */ - public String getIdentifier() { - return this.identifier; - } - - /** - * Get the metrics for the State Store. - * - * @return State Store metrics. - */ - public StateStoreMetrics getMetrics() { - return this.metrics; - } - - /** - * Prepare the driver to access data storage. - * - * @return True if the driver was successfully initialized. If false is - * returned, the state store will periodically attempt to - * re-initialize the driver and the router will remain in safe mode - * until the driver is initialized. - */ - public abstract boolean initDriver(); - - /** - * Initialize storage for a single record class. - * - * @param className String reference of the record class to initialize, - * used to construct paths and file names for the record. - * Determined by configuration settings for the specific - * driver. - * @param clazz Record type corresponding to the provided name. - * @return True if successful, false otherwise. - */ - public abstract <T extends BaseRecord> boolean initRecordStorage( - String className, Class<T> clazz); - - /** - * Check if the driver is currently running and the data store connection is - * valid. - * - * @return True if the driver is initialized and the data store is ready. - */ - public abstract boolean isDriverReady(); - - /** - * Check if the driver is ready to be used and throw an exception otherwise. - * - * @throws StateStoreUnavailableException If the driver is not ready. - */ - public void verifyDriverReady() throws StateStoreUnavailableException { - if (!isDriverReady()) { - String driverName = getDriverName(); - String hostname = getHostname(); - throw new StateStoreUnavailableException("State Store driver " + - driverName + " in " + hostname + " is not ready."); - } - } - - /** - * Close the State Store driver connection. - */ - public abstract void close() throws Exception; - - /** - * Returns the current time synchronization from the underlying store. - * Override for stores that supply a current date. The data store driver is - * responsible for maintaining the official synchronization time/date for all - * distributed components. - * - * @return Current time stamp, used for all synchronization dates. - */ - public long getTime() { - return Time.now(); - } - - /** - * Get the name of the driver implementation for debugging. - * - * @return Name of the driver implementation. - */ - private String getDriverName() { - return this.getClass().getSimpleName(); - } - - /** - * Get the host name of the machine running the driver for debugging. - * - * @return Host name of the machine running the driver. - */ - private String getHostname() { - String hostname = "Unknown"; - try { - hostname = InetAddress.getLocalHost().getHostName(); - } catch (Exception e) { - LOG.error("Cannot get local address", e); - } - return hostname; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java deleted file mode 100644 index 443d46e..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store.driver; - -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; -import org.apache.hadoop.hdfs.server.federation.store.records.Query; -import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; -import org.apache.hadoop.io.retry.AtMostOnce; -import org.apache.hadoop.io.retry.Idempotent; - -/** - * Operations for a driver to manage records in the State Store. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public interface StateStoreRecordOperations { - - /** - * Get all records of the requested record class from the data store. To use - * the default implementations in this class, getAll must return new instances - * of the records on each call. It is recommended to override the default - * implementations for better performance. - * - * @param clazz Class of record to fetch. - * @return List of all records that match the clazz. - * @throws IOException Throws exception if unable to query the data store. - */ - @Idempotent - <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) throws IOException; - - /** - * Get a single record from the store that matches the query. - * - * @param clazz Class of record to fetch. - * @param query Query to filter results. - * @return A single record matching the query. Null if there are no matching - * records or more than one matching record in the store. - * @throws IOException If multiple records match or if the data store cannot - * be queried. - */ - @Idempotent - <T extends BaseRecord> T get(Class<T> clazz, Query<T> query) - throws IOException; - - /** - * Get multiple records from the store that match a query. This method - * assumes the underlying driver does not support filtering. If the driver - * supports filtering it should overwrite this method. - * - * @param clazz Class of record to fetch. - * @param query Query to filter results. - * @return Records of type clazz that match the query or empty list if none - * are found. - * @throws IOException Throws exception if unable to query the data store. - */ - @Idempotent - <T extends BaseRecord> List<T> getMultiple( - Class<T> clazz, Query<T> query) throws IOException; - - /** - * Creates a single record. Optionally updates an existing record with same - * primary key. - * - * @param record The record to insert or update. - * @param allowUpdate True if update of exiting record is allowed. - * @param errorIfExists True if an error should be returned when inserting - * an existing record. Only used if allowUpdate = false. - * @return True if the operation was successful. - * - * @throws IOException Throws exception if unable to query the data store. - */ - @AtMostOnce - <T extends BaseRecord> boolean put( - T record, boolean allowUpdate, boolean errorIfExists) throws IOException; - - /** - * Creates multiple records. Optionally updates existing records that have - * the same primary key. - * - * @param records List of data records to update or create. All records must - * be of class clazz. - * @param clazz Record class of records. - * @param allowUpdate True if update of exiting record is allowed. - * @param errorIfExists True if an error should be returned when inserting - * an existing record. Only used if allowUpdate = false. - * @return true if all operations were successful. - * - * @throws IOException Throws exception if unable to query the data store. - */ - @AtMostOnce - <T extends BaseRecord> boolean putAll( - List<T> records, boolean allowUpdate, boolean errorIfExists) - throws IOException; - - /** - * Remove a single record. - * - * @param record Record to be removed. - * @return true If the record was successfully removed. False if the record - * could not be removed or not stored. - * @throws IOException Throws exception if unable to query the data store. - */ - @AtMostOnce - <T extends BaseRecord> boolean remove(T record) throws IOException; - - /** - * Remove all records of this class from the store. - * - * @param clazz Class of records to remove. - * @return True if successful. - * @throws IOException Throws exception if unable to query the data store. - */ - @AtMostOnce - <T extends BaseRecord> boolean removeAll(Class<T> clazz) throws IOException; - - /** - * Remove multiple records of a specific class that match a query. Requires - * the getAll implementation to fetch fresh records on each call. - * - * @param query Query to filter what to remove. - * @return The number of records removed. - * @throws IOException Throws exception if unable to query the data store. - */ - @AtMostOnce - <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query) - throws IOException; - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java deleted file mode 100644 index 8540405..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store.driver; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; -import org.apache.hadoop.util.ReflectionUtils; - -/** - * Serializer to store and retrieve data in the State Store. - */ -public abstract class StateStoreSerializer { - - /** Singleton for the serializer instance. */ - private static StateStoreSerializer defaultSerializer; - - /** - * Get the default serializer based. - * @return Singleton serializer. - */ - public static StateStoreSerializer getSerializer() { - return getSerializer(null); - } - - /** - * Get a serializer based on the provided configuration. - * @param conf Configuration. Default if null. - * @return Singleton serializer. - */ - public static StateStoreSerializer getSerializer(Configuration conf) { - if (conf == null) { - synchronized (StateStoreSerializer.class) { - if (defaultSerializer == null) { - conf = new Configuration(); - defaultSerializer = newSerializer(conf); - } - } - return defaultSerializer; - } else { - return newSerializer(conf); - } - } - - private static StateStoreSerializer newSerializer(final Configuration conf) { - Class<? extends StateStoreSerializer> serializerName = conf.getClass( - DFSConfigKeys.FEDERATION_STORE_SERIALIZER_CLASS, - DFSConfigKeys.FEDERATION_STORE_SERIALIZER_CLASS_DEFAULT, - StateStoreSerializer.class); - return ReflectionUtils.newInstance(serializerName, conf); - } - - /** - * Create a new record. - * @param clazz Class of the new record. - * @return New record. - */ - public static <T> T newRecord(Class<T> clazz) { - return getSerializer(null).newRecordInstance(clazz); - } - - /** - * Create a new record. - * @param clazz Class of the new record. - * @return New record. - */ - public abstract <T> T newRecordInstance(Class<T> clazz); - - /** - * Serialize a record into a byte array. - * @param record Record to serialize. - * @return Byte array with the serialized record. - */ - public abstract byte[] serialize(BaseRecord record); - - /** - * Serialize a record into a string. - * @param record Record to serialize. - * @return String with the serialized record. - */ - public abstract String serializeString(BaseRecord record); - - /** - * Deserialize a bytes array into a record. - * @param byteArray Byte array to deserialize. - * @param clazz Class of the record. - * @return New record. - * @throws IOException If it cannot deserialize the record. - */ - public abstract <T extends BaseRecord> T deserialize( - byte[] byteArray, Class<T> clazz) throws IOException; - - /** - * Deserialize a string into a record. - * @param data String with the data to deserialize. - * @param clazz Class of the record. - * @return New record. - * @throws IOException If it cannot deserialize the record. - */ - public abstract <T extends BaseRecord> T deserialize( - String data, Class<T> clazz) throws IOException; -} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org