http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java deleted file mode 100644 index 1bd35f2..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store.driver.impl; - -import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; -import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; -import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; -import org.apache.hadoop.hdfs.server.federation.store.records.Query; -import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; - -/** - * Base implementation of a State Store driver. It contains default - * implementations for the optional functions. These implementations use an - * uncached read/write all algorithm for all changes. In most cases it is - * recommended to override the optional functions. - * <p> - * Drivers may optionally override additional routines for performance - * optimization, such as custom get/put/remove queries, depending on the - * capabilities of the data store. - * <p> - */ -public abstract class StateStoreBaseImpl extends StateStoreDriver { - - @Override - public <T extends BaseRecord> T get( - Class<T> clazz, Query<T> query) throws IOException { - List<T> records = getMultiple(clazz, query); - if (records.size() > 1) { - throw new IOException("Found more than one object in collection"); - } else if (records.size() == 1) { - return records.get(0); - } else { - return null; - } - } - - @Override - public <T extends BaseRecord> List<T> getMultiple( - Class<T> clazz, Query<T> query) throws IOException { - QueryResult<T> result = get(clazz); - List<T> records = result.getRecords(); - List<T> ret = filterMultiple(query, records); - if (ret == null) { - throw new IOException("Cannot fetch records from the store"); - } - return ret; - } - - @Override - public <T extends BaseRecord> boolean put( - T record, boolean allowUpdate, boolean errorIfExists) throws IOException { - List<T> singletonList = new ArrayList<>(); - singletonList.add(record); - return putAll(singletonList, allowUpdate, errorIfExists); - } - - @Override - public <T extends BaseRecord> boolean remove(T record) throws IOException { - final Query<T> query = new Query<T>(record); - Class<? extends BaseRecord> clazz = record.getClass(); - @SuppressWarnings("unchecked") - Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz); - return remove(recordClass, query) == 1; - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java deleted file mode 100644 index 6638d1c..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java +++ /dev/null @@ -1,462 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store.driver.impl; - -import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple; -import static org.apache.hadoop.util.Time.monotonicNow; -import static org.apache.hadoop.util.Time.now; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; -import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; -import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; -import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; -import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; -import org.apache.hadoop.hdfs.server.federation.store.records.Query; -import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; - -/** - * {@link StateStoreDriver} implementation based on files. In this approach, we - * use temporary files for the writes and renaming "atomically" to the final - * value. Instead of writing to the final location, it will go to a temporary - * one and then rename to the final destination. - */ -public abstract class StateStoreFileBaseImpl - extends StateStoreSerializableImpl { - - private static final Logger LOG = - LoggerFactory.getLogger(StateStoreFileBaseImpl.class); - - /** File extension for temporary files. */ - private static final String TMP_MARK = ".tmp"; - /** We remove temporary files older than 10 seconds. */ - private static final long OLD_TMP_RECORD_MS = TimeUnit.SECONDS.toMillis(10); - /** File pattern for temporary records: file.XYZ.tmp. */ - private static final Pattern OLD_TMP_RECORD_PATTERN = - Pattern.compile(".+\\.(\\d+)\\.tmp"); - - /** If it is initialized. */ - private boolean initialized = false; - - - /** - * Get the reader of a record for the file system. - * - * @param path Path of the record to read. - * @return Reader for the record. - */ - protected abstract <T extends BaseRecord> BufferedReader getReader( - String path); - - /** - * Get the writer of a record for the file system. - * - * @param path Path of the record to write. - * @return Writer for the record. - */ - protected abstract <T extends BaseRecord> BufferedWriter getWriter( - String path); - - /** - * Check if a path exists. - * - * @param path Path to check. - * @return If the path exists. - */ - protected abstract boolean exists(String path); - - /** - * Make a directory. - * - * @param path Path of the directory to create. - * @return If the directory was created. - */ - protected abstract boolean mkdir(String path); - - /** - * Rename a file. This should be atomic. - * - * @param src Source name. - * @param dst Destination name. - * @return If the rename was successful. - */ - protected abstract boolean rename(String src, String dst); - - /** - * Remove a file. - * - * @param path Path for the file to remove - * @return If the file was removed. - */ - protected abstract boolean remove(String path); - - /** - * Get the children for a path. - * - * @param path Path to check. - * @return List of children. - */ - protected abstract List<String> getChildren(String path); - - /** - * Get root directory. - * - * @return Root directory. - */ - protected abstract String getRootDir(); - - /** - * Set the driver as initialized. - * - * @param ini If the driver is initialized. - */ - public void setInitialized(boolean ini) { - this.initialized = ini; - } - - @Override - public boolean initDriver() { - String rootDir = getRootDir(); - try { - if (rootDir == null) { - LOG.error("Invalid root directory, unable to initialize driver."); - return false; - } - - // Check root path - if (!exists(rootDir)) { - if (!mkdir(rootDir)) { - LOG.error("Cannot create State Store root directory {}", rootDir); - return false; - } - } - } catch (Exception ex) { - LOG.error( - "Cannot initialize filesystem using root directory {}", rootDir, ex); - return false; - } - setInitialized(true); - return true; - } - - @Override - public <T extends BaseRecord> boolean initRecordStorage( - String className, Class<T> recordClass) { - - String dataDirPath = getRootDir() + "/" + className; - try { - // Create data directories for files - if (!exists(dataDirPath)) { - LOG.info("{} data directory doesn't exist, creating it", dataDirPath); - if (!mkdir(dataDirPath)) { - LOG.error("Cannot create data directory {}", dataDirPath); - return false; - } - } - } catch (Exception ex) { - LOG.error("Cannot create data directory {}", dataDirPath, ex); - return false; - } - return true; - } - - @Override - public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) - throws IOException { - verifyDriverReady(); - long start = monotonicNow(); - StateStoreMetrics metrics = getMetrics(); - List<T> ret = new ArrayList<>(); - try { - String path = getPathForClass(clazz); - List<String> children = getChildren(path); - for (String child : children) { - String pathRecord = path + "/" + child; - if (child.endsWith(TMP_MARK)) { - LOG.debug("There is a temporary file {} in {}", child, path); - if (isOldTempRecord(child)) { - LOG.warn("Removing {} as it's an old temporary record", child); - remove(pathRecord); - } - } else { - T record = getRecord(pathRecord, clazz); - ret.add(record); - } - } - } catch (Exception e) { - if (metrics != null) { - metrics.addFailure(monotonicNow() - start); - } - String msg = "Cannot fetch records for " + clazz.getSimpleName(); - LOG.error(msg, e); - throw new IOException(msg, e); - } - - if (metrics != null) { - metrics.addRead(monotonicNow() - start); - } - return new QueryResult<T>(ret, getTime()); - } - - /** - * Check if a record is temporary and old. - * - * @param pathRecord Path for the record to check. - * @return If the record is temporary and old. - */ - @VisibleForTesting - public static boolean isOldTempRecord(final String pathRecord) { - if (!pathRecord.endsWith(TMP_MARK)) { - return false; - } - // Extract temporary record creation time - Matcher m = OLD_TMP_RECORD_PATTERN.matcher(pathRecord); - if (m.find()) { - long time = Long.parseLong(m.group(1)); - return now() - time > OLD_TMP_RECORD_MS; - } - return false; - } - - /** - * Read a record from a file. - * - * @param path Path to the file containing the record. - * @param clazz Class of the record. - * @return Record read from the file. - * @throws IOException If the file cannot be read. - */ - private <T extends BaseRecord> T getRecord( - final String path, final Class<T> clazz) throws IOException { - BufferedReader reader = getReader(path); - try { - String line; - while ((line = reader.readLine()) != null) { - if (!line.startsWith("#") && line.length() > 0) { - try { - T record = newRecord(line, clazz, false); - return record; - } catch (Exception ex) { - LOG.error("Cannot parse line {} in file {}", line, path, ex); - } - } - } - } finally { - if (reader != null) { - reader.close(); - } - } - throw new IOException("Cannot read " + path + " for record " + - clazz.getSimpleName()); - } - - /** - * Get the path for a record class. - * @param clazz Class of the record. - * @return Path for this record class. - */ - private <T extends BaseRecord> String getPathForClass(final Class<T> clazz) { - String className = StateStoreUtils.getRecordName(clazz); - StringBuilder sb = new StringBuilder(); - sb.append(getRootDir()); - if (sb.charAt(sb.length() - 1) != '/') { - sb.append("/"); - } - sb.append(className); - return sb.toString(); - } - - @Override - public boolean isDriverReady() { - return this.initialized; - } - - @Override - public <T extends BaseRecord> boolean putAll( - List<T> records, boolean allowUpdate, boolean errorIfExists) - throws StateStoreUnavailableException { - verifyDriverReady(); - if (records.isEmpty()) { - return true; - } - - long start = monotonicNow(); - StateStoreMetrics metrics = getMetrics(); - - // Check if any record exists - Map<String, T> toWrite = new HashMap<>(); - for (T record : records) { - Class<? extends BaseRecord> recordClass = record.getClass(); - String path = getPathForClass(recordClass); - String primaryKey = getPrimaryKey(record); - String recordPath = path + "/" + primaryKey; - - if (exists(recordPath)) { - if (allowUpdate) { - // Update the mod time stamp. Many backends will use their - // own timestamp for the mod time. - record.setDateModified(this.getTime()); - toWrite.put(recordPath, record); - } else if (errorIfExists) { - LOG.error("Attempt to insert record {} that already exists", - recordPath); - if (metrics != null) { - metrics.addFailure(monotonicNow() - start); - } - return false; - } else { - LOG.debug("Not updating {}", record); - } - } else { - toWrite.put(recordPath, record); - } - } - - // Write the records - boolean success = true; - for (Entry<String, T> entry : toWrite.entrySet()) { - String recordPath = entry.getKey(); - String recordPathTemp = recordPath + "." + now() + TMP_MARK; - BufferedWriter writer = getWriter(recordPathTemp); - try { - T record = entry.getValue(); - String line = serializeString(record); - writer.write(line); - } catch (IOException e) { - LOG.error("Cannot write {}", recordPathTemp, e); - success = false; - } finally { - if (writer != null) { - try { - writer.close(); - } catch (IOException e) { - LOG.error("Cannot close the writer for {}", recordPathTemp); - } - } - } - // Commit - if (!rename(recordPathTemp, recordPath)) { - LOG.error("Failed committing record into {}", recordPath); - success = false; - } - } - - long end = monotonicNow(); - if (metrics != null) { - if (success) { - metrics.addWrite(end - start); - } else { - metrics.addFailure(end - start); - } - } - return success; - } - - @Override - public <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query) - throws StateStoreUnavailableException { - verifyDriverReady(); - - if (query == null) { - return 0; - } - - long start = Time.monotonicNow(); - StateStoreMetrics metrics = getMetrics(); - int removed = 0; - // Get the current records - try { - final QueryResult<T> result = get(clazz); - final List<T> existingRecords = result.getRecords(); - // Write all of the existing records except those to be removed - final List<T> recordsToRemove = filterMultiple(query, existingRecords); - boolean success = true; - for (T recordToRemove : recordsToRemove) { - String path = getPathForClass(clazz); - String primaryKey = getPrimaryKey(recordToRemove); - String recordToRemovePath = path + "/" + primaryKey; - if (remove(recordToRemovePath)) { - removed++; - } else { - LOG.error("Cannot remove record {}", recordToRemovePath); - success = false; - } - } - if (!success) { - LOG.error("Cannot remove records {} query {}", clazz, query); - if (metrics != null) { - metrics.addFailure(monotonicNow() - start); - } - } - } catch (IOException e) { - LOG.error("Cannot remove records {} query {}", clazz, query, e); - if (metrics != null) { - metrics.addFailure(monotonicNow() - start); - } - } - - if (removed > 0 && metrics != null) { - metrics.addRemove(monotonicNow() - start); - } - return removed; - } - - @Override - public <T extends BaseRecord> boolean removeAll(Class<T> clazz) - throws StateStoreUnavailableException { - verifyDriverReady(); - long start = Time.monotonicNow(); - StateStoreMetrics metrics = getMetrics(); - - boolean success = true; - String path = getPathForClass(clazz); - List<String> children = getChildren(path); - for (String child : children) { - String pathRecord = path + "/" + child; - if (!remove(pathRecord)) { - success = false; - } - } - - if (metrics != null) { - long time = Time.monotonicNow() - start; - if (success) { - metrics.addRemove(time); - } else { - metrics.addFailure(time); - } - } - return success; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java deleted file mode 100644 index c585a23..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store.driver.impl; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.nio.charset.StandardCharsets; -import java.util.LinkedList; -import java.util.List; - -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.io.Files; - -/** - * StateStoreDriver implementation based on a local file. - */ -public class StateStoreFileImpl extends StateStoreFileBaseImpl { - - private static final Logger LOG = - LoggerFactory.getLogger(StateStoreFileImpl.class); - - /** Configuration keys. */ - public static final String FEDERATION_STORE_FILE_DIRECTORY = - DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.file.directory"; - - /** Root directory for the state store. */ - private String rootDirectory; - - - @Override - protected boolean exists(String path) { - File test = new File(path); - return test.exists(); - } - - @Override - protected boolean mkdir(String path) { - File dir = new File(path); - return dir.mkdirs(); - } - - @Override - protected boolean rename(String src, String dst) { - try { - Files.move(new File(src), new File(dst)); - return true; - } catch (IOException e) { - LOG.error("Cannot rename {} to {}", src, dst, e); - return false; - } - } - - @Override - protected boolean remove(String path) { - File file = new File(path); - return file.delete(); - } - - @Override - protected String getRootDir() { - if (this.rootDirectory == null) { - String dir = getConf().get(FEDERATION_STORE_FILE_DIRECTORY); - if (dir == null) { - File tempDir = Files.createTempDir(); - dir = tempDir.getAbsolutePath(); - LOG.warn("The root directory is not available, using {}", dir); - } - this.rootDirectory = dir; - } - return this.rootDirectory; - } - - @Override - protected <T extends BaseRecord> BufferedReader getReader(String filename) { - BufferedReader reader = null; - try { - LOG.debug("Loading file: {}", filename); - File file = new File(filename); - FileInputStream fis = new FileInputStream(file); - InputStreamReader isr = - new InputStreamReader(fis, StandardCharsets.UTF_8); - reader = new BufferedReader(isr); - } catch (Exception ex) { - LOG.error("Cannot open read stream for record {}", filename, ex); - } - return reader; - } - - @Override - protected <T extends BaseRecord> BufferedWriter getWriter(String filename) { - BufferedWriter writer = null; - try { - LOG.debug("Writing file: {}", filename); - File file = new File(filename); - FileOutputStream fos = new FileOutputStream(file, false); - OutputStreamWriter osw = - new OutputStreamWriter(fos, StandardCharsets.UTF_8); - writer = new BufferedWriter(osw); - } catch (IOException e) { - LOG.error("Cannot open write stream for record {}", filename, e); - } - return writer; - } - - @Override - public void close() throws Exception { - setInitialized(false); - } - - @Override - protected List<String> getChildren(String path) { - List<String> ret = new LinkedList<>(); - File dir = new File(path); - File[] files = dir.listFiles(); - if (files != null) { - for (File file : files) { - String filename = file.getName(); - ret.add(filename); - } - } - return ret; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java deleted file mode 100644 index 8d6c626..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store.driver.impl; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.net.URI; -import java.nio.charset.StandardCharsets; -import java.util.LinkedList; -import java.util.List; - -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Options; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * StateStoreDriver} implementation based on a filesystem. The most common uses - * HDFS as a backend. - */ -public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl { - - private static final Logger LOG = - LoggerFactory.getLogger(StateStoreFileSystemImpl.class); - - - /** Configuration keys. */ - public static final String FEDERATION_STORE_FS_PATH = - DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.fs.path"; - - /** File system to back the State Store. */ - private FileSystem fs; - /** Working path in the filesystem. */ - private String workPath; - - @Override - protected boolean exists(String path) { - try { - return fs.exists(new Path(path)); - } catch (IOException e) { - return false; - } - } - - @Override - protected boolean mkdir(String path) { - try { - return fs.mkdirs(new Path(path)); - } catch (IOException e) { - return false; - } - } - - @Override - protected boolean rename(String src, String dst) { - try { - if (fs instanceof DistributedFileSystem) { - DistributedFileSystem dfs = (DistributedFileSystem)fs; - dfs.rename(new Path(src), new Path(dst), Options.Rename.OVERWRITE); - return true; - } else { - // Replace should be atomic but not available - if (fs.exists(new Path(dst))) { - fs.delete(new Path(dst), true); - } - return fs.rename(new Path(src), new Path(dst)); - } - } catch (Exception e) { - LOG.error("Cannot rename {} to {}", src, dst, e); - return false; - } - } - - @Override - protected boolean remove(String path) { - try { - return fs.delete(new Path(path), true); - } catch (Exception e) { - LOG.error("Cannot remove {}", path, e); - return false; - } - } - - @Override - protected String getRootDir() { - if (this.workPath == null) { - String rootPath = getConf().get(FEDERATION_STORE_FS_PATH); - URI workUri; - try { - workUri = new URI(rootPath); - fs = FileSystem.get(workUri, getConf()); - } catch (Exception ex) { - return null; - } - this.workPath = rootPath; - } - return this.workPath; - } - - @Override - public void close() throws Exception { - if (fs != null) { - fs.close(); - } - } - - @Override - protected <T extends BaseRecord> BufferedReader getReader(String pathName) { - BufferedReader reader = null; - Path path = new Path(pathName); - try { - FSDataInputStream fdis = fs.open(path); - InputStreamReader isr = - new InputStreamReader(fdis, StandardCharsets.UTF_8); - reader = new BufferedReader(isr); - } catch (IOException ex) { - LOG.error("Cannot open read stream for {}", path); - } - return reader; - } - - @Override - protected <T extends BaseRecord> BufferedWriter getWriter(String pathName) { - BufferedWriter writer = null; - Path path = new Path(pathName); - try { - FSDataOutputStream fdos = fs.create(path, true); - OutputStreamWriter osw = - new OutputStreamWriter(fdos, StandardCharsets.UTF_8); - writer = new BufferedWriter(osw); - } catch (IOException ex) { - LOG.error("Cannot open write stream for {}", path); - } - return writer; - } - - @Override - protected List<String> getChildren(String pathName) { - List<String> ret = new LinkedList<>(); - Path path = new Path(workPath, pathName); - try { - FileStatus[] files = fs.listStatus(path); - for (FileStatus file : files) { - Path filePath = file.getPath(); - String fileName = filePath.getName(); - ret.add(fileName); - } - } catch (Exception e) { - LOG.error("Cannot get children for {}", pathName, e); - } - return ret; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java deleted file mode 100644 index 7bc93de..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store.driver.impl; - -import java.io.IOException; -import java.util.Collection; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; -import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; -import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; - -/** - * State Store driver that stores a serialization of the records. The serializer - * is pluggable. - */ -public abstract class StateStoreSerializableImpl extends StateStoreBaseImpl { - - /** Mark for slashes in path names. */ - protected static final String SLASH_MARK = "0SLASH0"; - /** Mark for colon in path names. */ - protected static final String COLON_MARK = "_"; - - /** Default serializer for this driver. */ - private StateStoreSerializer serializer; - - - @Override - public boolean init(final Configuration config, final String id, - final Collection<Class<? extends BaseRecord>> records, - final StateStoreMetrics metrics) { - boolean ret = super.init(config, id, records, metrics); - - this.serializer = StateStoreSerializer.getSerializer(config); - - return ret; - } - - /** - * Serialize a record using the serializer. - * @param record Record to serialize. - * @return Byte array with the serialization of the record. - */ - protected <T extends BaseRecord> byte[] serialize(T record) { - return serializer.serialize(record); - } - - /** - * Serialize a record using the serializer. - * @param record Record to serialize. - * @return String with the serialization of the record. - */ - protected <T extends BaseRecord> String serializeString(T record) { - return serializer.serializeString(record); - } - - /** - * Creates a record from an input data string. - * @param data Serialized text of the record. - * @param clazz Record class. - * @param includeDates If dateModified and dateCreated are serialized. - * @return The created record. - * @throws IOException - */ - protected <T extends BaseRecord> T newRecord( - String data, Class<T> clazz, boolean includeDates) throws IOException { - return serializer.deserialize(data, clazz); - } - - /** - * Get the primary key for a record. If we don't want to store in folders, we - * need to remove / from the name. - * - * @param record Record to get the primary key for. - * @return Primary key for the record. - */ - protected static String getPrimaryKey(BaseRecord record) { - String primaryKey = record.getPrimaryKey(); - primaryKey = primaryKey.replaceAll("/", SLASH_MARK); - primaryKey = primaryKey.replaceAll(":", COLON_MARK); - return primaryKey; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java deleted file mode 100644 index 45c5dd6..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store.driver.impl; - -import java.io.IOException; - -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.codec.binary.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; -import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; -import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord; -import org.apache.hadoop.util.ReflectionUtils; - -import com.google.protobuf.Message; - -/** - * Protobuf implementation of the State Store serializer. - */ -public final class StateStoreSerializerPBImpl extends StateStoreSerializer { - - private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb"; - private static final String PB_IMPL_CLASS_SUFFIX = "PBImpl"; - - private Configuration localConf = new Configuration(); - - - private StateStoreSerializerPBImpl() { - } - - @Override - @SuppressWarnings("unchecked") - public <T> T newRecordInstance(Class<T> clazz) { - try { - String clazzPBImpl = getPBImplClassName(clazz); - Class<?> pbClazz = localConf.getClassByName(clazzPBImpl); - Object retObject = ReflectionUtils.newInstance(pbClazz, localConf); - return (T)retObject; - } catch (ClassNotFoundException e) { - throw new RuntimeException(e); - } - } - - private String getPBImplClassName(Class<?> clazz) { - String srcPackagePart = getPackageName(clazz); - String srcClassName = getClassName(clazz); - String destPackagePart = srcPackagePart + "." + PB_IMPL_PACKAGE_SUFFIX; - String destClassPart = srcClassName + PB_IMPL_CLASS_SUFFIX; - return destPackagePart + "." + destClassPart; - } - - private String getClassName(Class<?> clazz) { - String fqName = clazz.getName(); - return (fqName.substring(fqName.lastIndexOf(".") + 1, fqName.length())); - } - - private String getPackageName(Class<?> clazz) { - return clazz.getPackage().getName(); - } - - @Override - public byte[] serialize(BaseRecord record) { - byte[] byteArray64 = null; - if (record instanceof PBRecord) { - PBRecord recordPB = (PBRecord) record; - Message msg = recordPB.getProto(); - byte[] byteArray = msg.toByteArray(); - byteArray64 = Base64.encodeBase64(byteArray, false); - } - return byteArray64; - } - - @Override - public String serializeString(BaseRecord record) { - byte[] byteArray64 = serialize(record); - String base64Encoded = StringUtils.newStringUtf8(byteArray64); - return base64Encoded; - } - - @Override - public <T extends BaseRecord> T deserialize( - byte[] byteArray, Class<T> clazz) throws IOException { - - T record = newRecord(clazz); - if (record instanceof PBRecord) { - PBRecord pbRecord = (PBRecord) record; - byte[] byteArray64 = Base64.encodeBase64(byteArray, false); - String base64Encoded = StringUtils.newStringUtf8(byteArray64); - pbRecord.readInstance(base64Encoded); - } - return record; - } - - @Override - public <T extends BaseRecord> T deserialize(String data, Class<T> clazz) - throws IOException { - byte[] byteArray64 = Base64.decodeBase64(data); - return deserialize(byteArray64, clazz); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java deleted file mode 100644 index 7f98171..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java +++ /dev/null @@ -1,328 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store.driver.impl; - -import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple; -import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordName; -import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath; -import static org.apache.hadoop.util.Time.monotonicNow; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.imps.CuratorFrameworkState; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; -import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; -import org.apache.hadoop.hdfs.server.federation.store.records.Query; -import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; -import org.apache.hadoop.util.curator.ZKCuratorManager; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@link StateStoreDriver} driver implementation that uses ZooKeeper as a - * backend. - * <p> - * The structure of the znodes in the ensemble is: - * PARENT_PATH - * |--- MOUNT - * |--- MEMBERSHIP - * |--- REBALANCER - * |--- ROUTERS - */ -public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { - - private static final Logger LOG = - LoggerFactory.getLogger(StateStoreZooKeeperImpl.class); - - - /** Configuration keys. */ - public static final String FEDERATION_STORE_ZK_DRIVER_PREFIX = - DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk."; - public static final String FEDERATION_STORE_ZK_PARENT_PATH = - FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path"; - public static final String FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT = - "/hdfs-federation"; - - - /** Directory to store the state store data. */ - private String baseZNode; - - /** Interface to ZooKeeper. */ - private ZKCuratorManager zkManager; - - - @Override - public boolean initDriver() { - LOG.info("Initializing ZooKeeper connection"); - - Configuration conf = getConf(); - baseZNode = conf.get( - FEDERATION_STORE_ZK_PARENT_PATH, - FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT); - try { - this.zkManager = new ZKCuratorManager(conf); - this.zkManager.start(); - } catch (IOException e) { - LOG.error("Cannot initialize the ZK connection", e); - return false; - } - return true; - } - - @Override - public <T extends BaseRecord> boolean initRecordStorage( - String className, Class<T> clazz) { - try { - String checkPath = getNodePath(baseZNode, className); - zkManager.createRootDirRecursively(checkPath); - return true; - } catch (Exception e) { - LOG.error("Cannot initialize ZK node for {}: {}", - className, e.getMessage()); - return false; - } - } - - @Override - public void close() throws Exception { - if (zkManager != null) { - zkManager.close(); - } - } - - @Override - public boolean isDriverReady() { - if (zkManager == null) { - return false; - } - CuratorFramework curator = zkManager.getCurator(); - if (curator == null) { - return false; - } - return curator.getState() == CuratorFrameworkState.STARTED; - } - - @Override - public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) - throws IOException { - verifyDriverReady(); - long start = monotonicNow(); - List<T> ret = new ArrayList<>(); - String znode = getZNodeForClass(clazz); - try { - List<String> children = zkManager.getChildren(znode); - for (String child : children) { - try { - String path = getNodePath(znode, child); - Stat stat = new Stat(); - String data = zkManager.getStringData(path, stat); - boolean corrupted = false; - if (data == null || data.equals("")) { - // All records should have data, otherwise this is corrupted - corrupted = true; - } else { - try { - T record = createRecord(data, stat, clazz); - ret.add(record); - } catch (IOException e) { - LOG.error("Cannot create record type \"{}\" from \"{}\": {}", - clazz.getSimpleName(), data, e.getMessage()); - corrupted = true; - } - } - - if (corrupted) { - LOG.error("Cannot get data for {} at {}, cleaning corrupted data", - child, path); - zkManager.delete(path); - } - } catch (Exception e) { - LOG.error("Cannot get data for {}: {}", child, e.getMessage()); - } - } - } catch (Exception e) { - getMetrics().addFailure(monotonicNow() - start); - String msg = "Cannot get children for \"" + znode + "\": " + - e.getMessage(); - LOG.error(msg); - throw new IOException(msg); - } - long end = monotonicNow(); - getMetrics().addRead(end - start); - return new QueryResult<T>(ret, getTime()); - } - - @Override - public <T extends BaseRecord> boolean putAll( - List<T> records, boolean update, boolean error) throws IOException { - verifyDriverReady(); - if (records.isEmpty()) { - return true; - } - - // All records should be the same - T record0 = records.get(0); - Class<? extends BaseRecord> recordClass = record0.getClass(); - String znode = getZNodeForClass(recordClass); - - long start = monotonicNow(); - boolean status = true; - for (T record : records) { - String primaryKey = getPrimaryKey(record); - String recordZNode = getNodePath(znode, primaryKey); - byte[] data = serialize(record); - if (!writeNode(recordZNode, data, update, error)){ - status = false; - } - } - long end = monotonicNow(); - if (status) { - getMetrics().addWrite(end - start); - } else { - getMetrics().addFailure(end - start); - } - return status; - } - - @Override - public <T extends BaseRecord> int remove( - Class<T> clazz, Query<T> query) throws IOException { - verifyDriverReady(); - if (query == null) { - return 0; - } - - // Read the current data - long start = monotonicNow(); - List<T> records = null; - try { - QueryResult<T> result = get(clazz); - records = result.getRecords(); - } catch (IOException ex) { - LOG.error("Cannot get existing records", ex); - getMetrics().addFailure(monotonicNow() - start); - return 0; - } - - // Check the records to remove - String znode = getZNodeForClass(clazz); - List<T> recordsToRemove = filterMultiple(query, records); - - // Remove the records - int removed = 0; - for (T existingRecord : recordsToRemove) { - LOG.info("Removing \"{}\"", existingRecord); - try { - String primaryKey = getPrimaryKey(existingRecord); - String path = getNodePath(znode, primaryKey); - if (zkManager.delete(path)) { - removed++; - } else { - LOG.error("Did not remove \"{}\"", existingRecord); - } - } catch (Exception e) { - LOG.error("Cannot remove \"{}\"", existingRecord, e); - getMetrics().addFailure(monotonicNow() - start); - } - } - long end = monotonicNow(); - if (removed > 0) { - getMetrics().addRemove(end - start); - } - return removed; - } - - @Override - public <T extends BaseRecord> boolean removeAll(Class<T> clazz) - throws IOException { - long start = monotonicNow(); - boolean status = true; - String znode = getZNodeForClass(clazz); - LOG.info("Deleting all children under {}", znode); - try { - List<String> children = zkManager.getChildren(znode); - for (String child : children) { - String path = getNodePath(znode, child); - LOG.info("Deleting {}", path); - zkManager.delete(path); - } - } catch (Exception e) { - LOG.error("Cannot remove {}: {}", znode, e.getMessage()); - status = false; - } - long time = monotonicNow() - start; - if (status) { - getMetrics().addRemove(time); - } else { - getMetrics().addFailure(time); - } - return status; - } - - private boolean writeNode( - String znode, byte[] bytes, boolean update, boolean error) { - try { - boolean created = zkManager.create(znode); - if (!update && !created && error) { - LOG.info("Cannot write record \"{}\", it already exists", znode); - return false; - } - - // Write data - zkManager.setData(znode, bytes, -1); - return true; - } catch (Exception e) { - LOG.error("Cannot write record \"{}\": {}", znode, e.getMessage()); - } - return false; - } - - /** - * Get the ZNode for a class. - * - * @param clazz Record class to evaluate. - * @return The ZNode for the class. - */ - private <T extends BaseRecord> String getZNodeForClass(Class<T> clazz) { - String className = getRecordName(clazz); - return getNodePath(baseZNode, className); - } - - /** - * Creates a record from a string returned by ZooKeeper. - * - * @param data The data to write. - * @param stat Stat of the data record to create. - * @param clazz The data record type to create. - * @return The created record. - * @throws IOException - */ - private <T extends BaseRecord> T createRecord( - String data, Stat stat, Class<T> clazz) throws IOException { - T record = newRecord(data, clazz, false); - record.setDateCreated(stat.getCtime()); - record.setDateModified(stat.getMtime()); - return record; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/package-info.java deleted file mode 100644 index a18433e..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/package-info.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Implementations of state store data providers/drivers. Each driver is - * responsible for maintaining, querying, updating and deleting persistent data - * records. Data records are defined as subclasses of - * {@link org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord}. - * Each driver implements the - * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver - * StateStoreDriver} interface. - * <p> - * Currently supported drivers: - * <ul> - * <li>{@link StateStoreFileImpl} A file-based data storage backend. - * <li>{@link StateStoreZooKeeperImpl} A zookeeper based data storage backend. - * </ul> - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -package org.apache.hadoop.hdfs.server.federation.store.driver.impl; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/package-info.java deleted file mode 100644 index da998b5..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/package-info.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * The state store uses modular data storage classes derived from - * StateStoreDriver to handle querying, updating and deleting data records. The - * data storage driver is initialized and maintained by the StateStoreService. - * The state store supports fetching all records of a type, filtering by column - * values or fetching a single record by its primary key. - * <p> - * Each data storage backend is required to implement the methods contained in - * the StateStoreDriver interface. These methods allow the querying, updating, - * inserting and deleting of data records into the state store. - */ - -@InterfaceAudience.Private -@InterfaceStability.Evolving - -package org.apache.hadoop.hdfs.server.federation.store.driver; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java deleted file mode 100644 index 57b7b61..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java +++ /dev/null @@ -1,310 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store.impl; - -import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; -import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; -import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; -import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache; -import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationResponse; -import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; -import org.apache.hadoop.hdfs.server.federation.store.records.Query; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implementation of the {@link MembershipStore} State Store API. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class MembershipStoreImpl - extends MembershipStore implements StateStoreCache { - - private static final Logger LOG = - LoggerFactory.getLogger(MembershipStoreImpl.class); - - - /** Reported namespaces that are not decommissioned. */ - private final Set<FederationNamespaceInfo> activeNamespaces; - - /** Namenodes (after evaluating the quorum) that are active in the cluster. */ - private final Map<String, MembershipState> activeRegistrations; - /** Namenode status reports (raw) that were discarded for being too old. */ - private final Map<String, MembershipState> expiredRegistrations; - - /** Lock to access the local memory cache. */ - private final ReadWriteLock cacheReadWriteLock = - new ReentrantReadWriteLock(); - private final Lock cacheReadLock = cacheReadWriteLock.readLock(); - private final Lock cacheWriteLock = cacheReadWriteLock.writeLock(); - - - public MembershipStoreImpl(StateStoreDriver driver) { - super(driver); - - this.activeRegistrations = new HashMap<>(); - this.expiredRegistrations = new HashMap<>(); - this.activeNamespaces = new TreeSet<>(); - } - - @Override - public GetNamenodeRegistrationsResponse getExpiredNamenodeRegistrations( - GetNamenodeRegistrationsRequest request) throws IOException { - - GetNamenodeRegistrationsResponse response = - GetNamenodeRegistrationsResponse.newInstance(); - cacheReadLock.lock(); - try { - Collection<MembershipState> vals = this.expiredRegistrations.values(); - List<MembershipState> copyVals = new ArrayList<>(vals); - response.setNamenodeMemberships(copyVals); - } finally { - cacheReadLock.unlock(); - } - return response; - } - - @Override - public GetNamespaceInfoResponse getNamespaceInfo( - GetNamespaceInfoRequest request) throws IOException { - - Set<FederationNamespaceInfo> namespaces = new HashSet<>(); - try { - cacheReadLock.lock(); - namespaces.addAll(activeNamespaces); - } finally { - cacheReadLock.unlock(); - } - - GetNamespaceInfoResponse response = - GetNamespaceInfoResponse.newInstance(namespaces); - return response; - } - - @Override - public GetNamenodeRegistrationsResponse getNamenodeRegistrations( - final GetNamenodeRegistrationsRequest request) throws IOException { - - // TODO Cache some common queries and sorts - List<MembershipState> ret = null; - - cacheReadLock.lock(); - try { - Collection<MembershipState> registrations = activeRegistrations.values(); - MembershipState partialMembership = request.getPartialMembership(); - if (partialMembership == null) { - ret = new ArrayList<>(registrations); - } else { - Query<MembershipState> query = new Query<>(partialMembership); - ret = filterMultiple(query, registrations); - } - } finally { - cacheReadLock.unlock(); - } - // Sort in ascending update date order - Collections.sort(ret); - - GetNamenodeRegistrationsResponse response = - GetNamenodeRegistrationsResponse.newInstance(ret); - return response; - } - - @Override - public NamenodeHeartbeatResponse namenodeHeartbeat( - NamenodeHeartbeatRequest request) throws IOException { - - MembershipState record = request.getNamenodeMembership(); - String nnId = record.getNamenodeKey(); - MembershipState existingEntry = null; - cacheReadLock.lock(); - try { - existingEntry = this.activeRegistrations.get(nnId); - } finally { - cacheReadLock.unlock(); - } - - if (existingEntry != null) { - if (existingEntry.getState() != record.getState()) { - LOG.info("NN registration state has changed: {} -> {}", - existingEntry, record); - } else { - LOG.debug("Updating NN registration: {} -> {}", existingEntry, record); - } - } else { - LOG.info("Inserting new NN registration: {}", record); - } - - boolean status = getDriver().put(record, true, false); - - NamenodeHeartbeatResponse response = - NamenodeHeartbeatResponse.newInstance(status); - return response; - } - - @Override - public boolean loadCache(boolean force) throws IOException { - super.loadCache(force); - - // Update local cache atomically - cacheWriteLock.lock(); - try { - this.activeRegistrations.clear(); - this.expiredRegistrations.clear(); - this.activeNamespaces.clear(); - - // Build list of NN registrations: nnId -> registration list - Map<String, List<MembershipState>> nnRegistrations = new HashMap<>(); - List<MembershipState> cachedRecords = getCachedRecords(); - for (MembershipState membership : cachedRecords) { - String nnId = membership.getNamenodeKey(); - if (membership.getState() == FederationNamenodeServiceState.EXPIRED) { - // Expired, RPC service does not use these - String key = membership.getPrimaryKey(); - this.expiredRegistrations.put(key, membership); - } else { - // This is a valid NN registration, build a list of all registrations - // using the NN id to use for the quorum calculation. - List<MembershipState> nnRegistration = - nnRegistrations.get(nnId); - if (nnRegistration == null) { - nnRegistration = new LinkedList<>(); - nnRegistrations.put(nnId, nnRegistration); - } - nnRegistration.add(membership); - String bpId = membership.getBlockPoolId(); - String cId = membership.getClusterId(); - String nsId = membership.getNameserviceId(); - FederationNamespaceInfo nsInfo = - new FederationNamespaceInfo(bpId, cId, nsId); - this.activeNamespaces.add(nsInfo); - } - } - - // Calculate most representative entry for each active NN id - for (List<MembershipState> nnRegistration : nnRegistrations.values()) { - // Run quorum based on NN state - MembershipState representativeRecord = - getRepresentativeQuorum(nnRegistration); - String nnKey = representativeRecord.getNamenodeKey(); - this.activeRegistrations.put(nnKey, representativeRecord); - } - LOG.debug("Refreshed {} NN registrations from State Store", - cachedRecords.size()); - } finally { - cacheWriteLock.unlock(); - } - return true; - } - - @Override - public UpdateNamenodeRegistrationResponse updateNamenodeRegistration( - UpdateNamenodeRegistrationRequest request) throws IOException { - - boolean status = false; - cacheWriteLock.lock(); - try { - String namenode = MembershipState.getNamenodeKey( - request.getNameserviceId(), request.getNamenodeId()); - MembershipState member = this.activeRegistrations.get(namenode); - if (member != null) { - member.setState(request.getState()); - status = true; - } - } finally { - cacheWriteLock.unlock(); - } - UpdateNamenodeRegistrationResponse response = - UpdateNamenodeRegistrationResponse.newInstance(status); - return response; - } - - /** - * Picks the most recent entry in the subset that is most agreeable on the - * specified field. 1) If a majority of the collection has the same value for - * the field, the first sorted entry within the subset the matches the - * majority value 2) Otherwise the first sorted entry in the set of all - * entries - * - * @param records - Collection of state store record objects of the same type - * @return record that is most representative of the field name - */ - private MembershipState getRepresentativeQuorum( - Collection<MembershipState> records) { - - // Collate objects by field value: field value -> order set of records - Map<FederationNamenodeServiceState, TreeSet<MembershipState>> occurenceMap = - new HashMap<>(); - for (MembershipState record : records) { - FederationNamenodeServiceState state = record.getState(); - TreeSet<MembershipState> matchingSet = occurenceMap.get(state); - if (matchingSet == null) { - // TreeSet orders elements by descending date via comparators - matchingSet = new TreeSet<>(); - occurenceMap.put(state, matchingSet); - } - matchingSet.add(record); - } - - // Select largest group - TreeSet<MembershipState> largestSet = new TreeSet<>(); - for (TreeSet<MembershipState> matchingSet : occurenceMap.values()) { - if (largestSet.size() < matchingSet.size()) { - largestSet = matchingSet; - } - } - - // If quorum, use the newest element here - if (largestSet.size() > records.size() / 2) { - return largestSet.first(); - // Otherwise, return most recent by class comparator - } else if (records.size() > 0) { - TreeSet<MembershipState> sortedList = new TreeSet<>(records); - LOG.debug("Quorum failed, using most recent: {}", sortedList.first()); - return sortedList.first(); - } else { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java deleted file mode 100644 index eb117d6..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java +++ /dev/null @@ -1,154 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store.impl; - -import java.io.IOException; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.hdfs.server.federation.router.RouterAdminServer; -import org.apache.hadoop.hdfs.server.federation.router.RouterPermissionChecker; -import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; -import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; -import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; -import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; -import org.apache.hadoop.hdfs.server.federation.store.records.Query; -import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.util.Time; - -/** - * Implementation of the {@link MountTableStore} state store API. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class MountTableStoreImpl extends MountTableStore { - - public MountTableStoreImpl(StateStoreDriver driver) { - super(driver); - } - - @Override - public AddMountTableEntryResponse addMountTableEntry( - AddMountTableEntryRequest request) throws IOException { - MountTable mountTable = request.getEntry(); - if (mountTable != null) { - RouterPermissionChecker pc = RouterAdminServer.getPermissionChecker(); - if (pc != null) { - pc.checkPermission(mountTable, FsAction.WRITE); - } - } - - boolean status = getDriver().put(mountTable, false, true); - AddMountTableEntryResponse response = - AddMountTableEntryResponse.newInstance(); - response.setStatus(status); - return response; - } - - @Override - public UpdateMountTableEntryResponse updateMountTableEntry( - UpdateMountTableEntryRequest request) throws IOException { - MountTable mountTable = request.getEntry(); - if (mountTable != null) { - RouterPermissionChecker pc = RouterAdminServer.getPermissionChecker(); - if (pc != null) { - pc.checkPermission(mountTable, FsAction.WRITE); - } - } - - boolean status = getDriver().put(mountTable, true, true); - UpdateMountTableEntryResponse response = - UpdateMountTableEntryResponse.newInstance(); - response.setStatus(status); - return response; - } - - @Override - public RemoveMountTableEntryResponse removeMountTableEntry( - RemoveMountTableEntryRequest request) throws IOException { - final String srcPath = request.getSrcPath(); - final MountTable partial = MountTable.newInstance(); - partial.setSourcePath(srcPath); - final Query<MountTable> query = new Query<>(partial); - final MountTable deleteEntry = getDriver().get(getRecordClass(), query); - - boolean status = false; - if (deleteEntry != null) { - RouterPermissionChecker pc = RouterAdminServer.getPermissionChecker(); - if (pc != null) { - pc.checkPermission(deleteEntry, FsAction.WRITE); - } - status = getDriver().remove(deleteEntry); - } - - RemoveMountTableEntryResponse response = - RemoveMountTableEntryResponse.newInstance(); - response.setStatus(status); - return response; - } - - @Override - public GetMountTableEntriesResponse getMountTableEntries( - GetMountTableEntriesRequest request) throws IOException { - RouterPermissionChecker pc = - RouterAdminServer.getPermissionChecker(); - // Get all values from the cache - List<MountTable> records = getCachedRecords(); - - // Sort and filter - Collections.sort(records, MountTable.SOURCE_COMPARATOR); - String reqSrcPath = request.getSrcPath(); - if (reqSrcPath != null && !reqSrcPath.isEmpty()) { - // Return only entries beneath this path - Iterator<MountTable> it = records.iterator(); - while (it.hasNext()) { - MountTable record = it.next(); - String srcPath = record.getSourcePath(); - if (!srcPath.startsWith(reqSrcPath)) { - it.remove(); - } else if (pc != null) { - // do the READ permission check - try { - pc.checkPermission(record, FsAction.READ); - } catch (AccessControlException ignored) { - // Remove this mount table entry if it cannot - // be accessed by current user. - it.remove(); - } - } - } - } - - GetMountTableEntriesResponse response = - GetMountTableEntriesResponse.newInstance(); - response.setEntries(records); - response.setTimestamp(Time.now()); - return response; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/RouterStoreImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/RouterStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/RouterStoreImpl.java deleted file mode 100644 index d58c288..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/RouterStoreImpl.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store.impl; - -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.server.federation.store.RouterStore; -import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse; -import org.apache.hadoop.hdfs.server.federation.store.records.Query; -import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; -import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; - -/** - * Implementation of the {@link RouterStore} state store API. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class RouterStoreImpl extends RouterStore { - - public RouterStoreImpl(StateStoreDriver driver) { - super(driver); - } - - @Override - public GetRouterRegistrationResponse getRouterRegistration( - GetRouterRegistrationRequest request) throws IOException { - - final RouterState partial = RouterState.newInstance(); - partial.setAddress(request.getRouterId()); - final Query<RouterState> query = new Query<RouterState>(partial); - RouterState record = getDriver().get(getRecordClass(), query); - if (record != null) { - overrideExpiredRecord(record); - } - GetRouterRegistrationResponse response = - GetRouterRegistrationResponse.newInstance(); - response.setRouter(record); - return response; - } - - @Override - public GetRouterRegistrationsResponse getRouterRegistrations( - GetRouterRegistrationsRequest request) throws IOException { - - // Get all values from the cache - QueryResult<RouterState> recordsAndTimeStamp = - getCachedRecordsAndTimeStamp(); - List<RouterState> records = recordsAndTimeStamp.getRecords(); - long timestamp = recordsAndTimeStamp.getTimestamp(); - - // Generate response - GetRouterRegistrationsResponse response = - GetRouterRegistrationsResponse.newInstance(); - response.setRouters(records); - response.setTimestamp(timestamp); - return response; - } - - @Override - public RouterHeartbeatResponse routerHeartbeat(RouterHeartbeatRequest request) - throws IOException { - - RouterState record = request.getRouter(); - boolean status = getDriver().put(record, true, false); - RouterHeartbeatResponse response = - RouterHeartbeatResponse.newInstance(status); - return response; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java deleted file mode 100644 index 1a50d15..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Contains implementations of the state store API interfaces. All classes - * derive from {@link - * org.apache.hadoop.hdfs.server.federation.store.RecordStore}. The API - * definitions are contained in the - * org.apache.hadoop.hdfs.server.federation.store package. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -package org.apache.hadoop.hdfs.server.federation.store.impl; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/package-info.java deleted file mode 100644 index 949ec7c..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/package-info.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * The federation state store tracks persistent values that are shared between - * multiple routers. - * <p> - * Data is stored in data records that inherit from a common class. Data records - * are serialized when written to the data store using a modular serialization - * implementation. The default is profobuf serialization. Data is stored as rows - * of records of the same type with each data member in a record representing a - * column. - * <p> - * The state store uses a modular data storage - * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver - * StateStoreDriver} to handle querying, updating and deleting data records. The - * data storage driver is initialized and maintained by the - * {@link org.apache.hadoop.hdfs.server.federation.store. - * StateStoreService FederationStateStoreService}. The state store - * supports fetching all records of a type, filtering by column values or - * fetching a single record by its primary key. - * <p> - * The state store contains several API interfaces, one for each data records - * type. - * <p> - * <ul> - * <li>FederationMembershipStateStore: state of all Namenodes in the federation. - * Uses the MembershipState record. - * <li>FederationMountTableStore: Mount table mapping paths in the global - * namespace to individual subcluster paths. Uses the MountTable record. - * <li>RouterStateStore: State of all routers in the federation. Uses the - * RouterState record. - * </ul> - * <p> - * Each API is defined in a separate interface. The implementations of these - * interfaces are responsible for accessing the - * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver - * StateStoreDriver} to query, update and delete data records. - */ - -@InterfaceAudience.Private -@InterfaceStability.Evolving - -package org.apache.hadoop.hdfs.server.federation.store; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryRequest.java deleted file mode 100644 index 2d9f102..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryRequest.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.store.protocol; - -import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; -import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; - -/** - * API request for adding a mount table entry to the state store. - */ -public abstract class AddMountTableEntryRequest { - - public static AddMountTableEntryRequest newInstance() { - return StateStoreSerializer.newRecord(AddMountTableEntryRequest.class); - } - - public static AddMountTableEntryRequest newInstance(MountTable newEntry) { - AddMountTableEntryRequest request = newInstance(); - request.setEntry(newEntry); - return request; - } - - @Public - @Unstable - public abstract MountTable getEntry(); - - @Public - @Unstable - public abstract void setEntry(MountTable mount); -} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org