http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java new file mode 100644 index 0000000..1bd35f2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; + +/** + * Base implementation of a State Store driver. It contains default + * implementations for the optional functions. These implementations use an + * uncached read/write all algorithm for all changes. In most cases it is + * recommended to override the optional functions. + * <p> + * Drivers may optionally override additional routines for performance + * optimization, such as custom get/put/remove queries, depending on the + * capabilities of the data store. + * <p> + */ +public abstract class StateStoreBaseImpl extends StateStoreDriver { + + @Override + public <T extends BaseRecord> T get( + Class<T> clazz, Query<T> query) throws IOException { + List<T> records = getMultiple(clazz, query); + if (records.size() > 1) { + throw new IOException("Found more than one object in collection"); + } else if (records.size() == 1) { + return records.get(0); + } else { + return null; + } + } + + @Override + public <T extends BaseRecord> List<T> getMultiple( + Class<T> clazz, Query<T> query) throws IOException { + QueryResult<T> result = get(clazz); + List<T> records = result.getRecords(); + List<T> ret = filterMultiple(query, records); + if (ret == null) { + throw new IOException("Cannot fetch records from the store"); + } + return ret; + } + + @Override + public <T extends BaseRecord> boolean put( + T record, boolean allowUpdate, boolean errorIfExists) throws IOException { + List<T> singletonList = new ArrayList<>(); + singletonList.add(record); + return putAll(singletonList, allowUpdate, errorIfExists); + } + + @Override + public <T extends BaseRecord> boolean remove(T record) throws IOException { + final Query<T> query = new Query<T>(record); + Class<? extends BaseRecord> clazz = record.getClass(); + @SuppressWarnings("unchecked") + Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz); + return remove(recordClass, query) == 1; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java new file mode 100644 index 0000000..6638d1c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple; +import static org.apache.hadoop.util.Time.monotonicNow; +import static org.apache.hadoop.util.Time.now; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * {@link StateStoreDriver} implementation based on files. In this approach, we + * use temporary files for the writes and renaming "atomically" to the final + * value. Instead of writing to the final location, it will go to a temporary + * one and then rename to the final destination. + */ +public abstract class StateStoreFileBaseImpl + extends StateStoreSerializableImpl { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreFileBaseImpl.class); + + /** File extension for temporary files. */ + private static final String TMP_MARK = ".tmp"; + /** We remove temporary files older than 10 seconds. */ + private static final long OLD_TMP_RECORD_MS = TimeUnit.SECONDS.toMillis(10); + /** File pattern for temporary records: file.XYZ.tmp. */ + private static final Pattern OLD_TMP_RECORD_PATTERN = + Pattern.compile(".+\\.(\\d+)\\.tmp"); + + /** If it is initialized. */ + private boolean initialized = false; + + + /** + * Get the reader of a record for the file system. + * + * @param path Path of the record to read. + * @return Reader for the record. + */ + protected abstract <T extends BaseRecord> BufferedReader getReader( + String path); + + /** + * Get the writer of a record for the file system. + * + * @param path Path of the record to write. + * @return Writer for the record. + */ + protected abstract <T extends BaseRecord> BufferedWriter getWriter( + String path); + + /** + * Check if a path exists. + * + * @param path Path to check. + * @return If the path exists. + */ + protected abstract boolean exists(String path); + + /** + * Make a directory. + * + * @param path Path of the directory to create. + * @return If the directory was created. + */ + protected abstract boolean mkdir(String path); + + /** + * Rename a file. This should be atomic. + * + * @param src Source name. + * @param dst Destination name. + * @return If the rename was successful. + */ + protected abstract boolean rename(String src, String dst); + + /** + * Remove a file. + * + * @param path Path for the file to remove + * @return If the file was removed. + */ + protected abstract boolean remove(String path); + + /** + * Get the children for a path. + * + * @param path Path to check. + * @return List of children. + */ + protected abstract List<String> getChildren(String path); + + /** + * Get root directory. + * + * @return Root directory. + */ + protected abstract String getRootDir(); + + /** + * Set the driver as initialized. + * + * @param ini If the driver is initialized. + */ + public void setInitialized(boolean ini) { + this.initialized = ini; + } + + @Override + public boolean initDriver() { + String rootDir = getRootDir(); + try { + if (rootDir == null) { + LOG.error("Invalid root directory, unable to initialize driver."); + return false; + } + + // Check root path + if (!exists(rootDir)) { + if (!mkdir(rootDir)) { + LOG.error("Cannot create State Store root directory {}", rootDir); + return false; + } + } + } catch (Exception ex) { + LOG.error( + "Cannot initialize filesystem using root directory {}", rootDir, ex); + return false; + } + setInitialized(true); + return true; + } + + @Override + public <T extends BaseRecord> boolean initRecordStorage( + String className, Class<T> recordClass) { + + String dataDirPath = getRootDir() + "/" + className; + try { + // Create data directories for files + if (!exists(dataDirPath)) { + LOG.info("{} data directory doesn't exist, creating it", dataDirPath); + if (!mkdir(dataDirPath)) { + LOG.error("Cannot create data directory {}", dataDirPath); + return false; + } + } + } catch (Exception ex) { + LOG.error("Cannot create data directory {}", dataDirPath, ex); + return false; + } + return true; + } + + @Override + public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) + throws IOException { + verifyDriverReady(); + long start = monotonicNow(); + StateStoreMetrics metrics = getMetrics(); + List<T> ret = new ArrayList<>(); + try { + String path = getPathForClass(clazz); + List<String> children = getChildren(path); + for (String child : children) { + String pathRecord = path + "/" + child; + if (child.endsWith(TMP_MARK)) { + LOG.debug("There is a temporary file {} in {}", child, path); + if (isOldTempRecord(child)) { + LOG.warn("Removing {} as it's an old temporary record", child); + remove(pathRecord); + } + } else { + T record = getRecord(pathRecord, clazz); + ret.add(record); + } + } + } catch (Exception e) { + if (metrics != null) { + metrics.addFailure(monotonicNow() - start); + } + String msg = "Cannot fetch records for " + clazz.getSimpleName(); + LOG.error(msg, e); + throw new IOException(msg, e); + } + + if (metrics != null) { + metrics.addRead(monotonicNow() - start); + } + return new QueryResult<T>(ret, getTime()); + } + + /** + * Check if a record is temporary and old. + * + * @param pathRecord Path for the record to check. + * @return If the record is temporary and old. + */ + @VisibleForTesting + public static boolean isOldTempRecord(final String pathRecord) { + if (!pathRecord.endsWith(TMP_MARK)) { + return false; + } + // Extract temporary record creation time + Matcher m = OLD_TMP_RECORD_PATTERN.matcher(pathRecord); + if (m.find()) { + long time = Long.parseLong(m.group(1)); + return now() - time > OLD_TMP_RECORD_MS; + } + return false; + } + + /** + * Read a record from a file. + * + * @param path Path to the file containing the record. + * @param clazz Class of the record. + * @return Record read from the file. + * @throws IOException If the file cannot be read. + */ + private <T extends BaseRecord> T getRecord( + final String path, final Class<T> clazz) throws IOException { + BufferedReader reader = getReader(path); + try { + String line; + while ((line = reader.readLine()) != null) { + if (!line.startsWith("#") && line.length() > 0) { + try { + T record = newRecord(line, clazz, false); + return record; + } catch (Exception ex) { + LOG.error("Cannot parse line {} in file {}", line, path, ex); + } + } + } + } finally { + if (reader != null) { + reader.close(); + } + } + throw new IOException("Cannot read " + path + " for record " + + clazz.getSimpleName()); + } + + /** + * Get the path for a record class. + * @param clazz Class of the record. + * @return Path for this record class. + */ + private <T extends BaseRecord> String getPathForClass(final Class<T> clazz) { + String className = StateStoreUtils.getRecordName(clazz); + StringBuilder sb = new StringBuilder(); + sb.append(getRootDir()); + if (sb.charAt(sb.length() - 1) != '/') { + sb.append("/"); + } + sb.append(className); + return sb.toString(); + } + + @Override + public boolean isDriverReady() { + return this.initialized; + } + + @Override + public <T extends BaseRecord> boolean putAll( + List<T> records, boolean allowUpdate, boolean errorIfExists) + throws StateStoreUnavailableException { + verifyDriverReady(); + if (records.isEmpty()) { + return true; + } + + long start = monotonicNow(); + StateStoreMetrics metrics = getMetrics(); + + // Check if any record exists + Map<String, T> toWrite = new HashMap<>(); + for (T record : records) { + Class<? extends BaseRecord> recordClass = record.getClass(); + String path = getPathForClass(recordClass); + String primaryKey = getPrimaryKey(record); + String recordPath = path + "/" + primaryKey; + + if (exists(recordPath)) { + if (allowUpdate) { + // Update the mod time stamp. Many backends will use their + // own timestamp for the mod time. + record.setDateModified(this.getTime()); + toWrite.put(recordPath, record); + } else if (errorIfExists) { + LOG.error("Attempt to insert record {} that already exists", + recordPath); + if (metrics != null) { + metrics.addFailure(monotonicNow() - start); + } + return false; + } else { + LOG.debug("Not updating {}", record); + } + } else { + toWrite.put(recordPath, record); + } + } + + // Write the records + boolean success = true; + for (Entry<String, T> entry : toWrite.entrySet()) { + String recordPath = entry.getKey(); + String recordPathTemp = recordPath + "." + now() + TMP_MARK; + BufferedWriter writer = getWriter(recordPathTemp); + try { + T record = entry.getValue(); + String line = serializeString(record); + writer.write(line); + } catch (IOException e) { + LOG.error("Cannot write {}", recordPathTemp, e); + success = false; + } finally { + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + LOG.error("Cannot close the writer for {}", recordPathTemp); + } + } + } + // Commit + if (!rename(recordPathTemp, recordPath)) { + LOG.error("Failed committing record into {}", recordPath); + success = false; + } + } + + long end = monotonicNow(); + if (metrics != null) { + if (success) { + metrics.addWrite(end - start); + } else { + metrics.addFailure(end - start); + } + } + return success; + } + + @Override + public <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query) + throws StateStoreUnavailableException { + verifyDriverReady(); + + if (query == null) { + return 0; + } + + long start = Time.monotonicNow(); + StateStoreMetrics metrics = getMetrics(); + int removed = 0; + // Get the current records + try { + final QueryResult<T> result = get(clazz); + final List<T> existingRecords = result.getRecords(); + // Write all of the existing records except those to be removed + final List<T> recordsToRemove = filterMultiple(query, existingRecords); + boolean success = true; + for (T recordToRemove : recordsToRemove) { + String path = getPathForClass(clazz); + String primaryKey = getPrimaryKey(recordToRemove); + String recordToRemovePath = path + "/" + primaryKey; + if (remove(recordToRemovePath)) { + removed++; + } else { + LOG.error("Cannot remove record {}", recordToRemovePath); + success = false; + } + } + if (!success) { + LOG.error("Cannot remove records {} query {}", clazz, query); + if (metrics != null) { + metrics.addFailure(monotonicNow() - start); + } + } + } catch (IOException e) { + LOG.error("Cannot remove records {} query {}", clazz, query, e); + if (metrics != null) { + metrics.addFailure(monotonicNow() - start); + } + } + + if (removed > 0 && metrics != null) { + metrics.addRemove(monotonicNow() - start); + } + return removed; + } + + @Override + public <T extends BaseRecord> boolean removeAll(Class<T> clazz) + throws StateStoreUnavailableException { + verifyDriverReady(); + long start = Time.monotonicNow(); + StateStoreMetrics metrics = getMetrics(); + + boolean success = true; + String path = getPathForClass(clazz); + List<String> children = getChildren(path); + for (String child : children) { + String pathRecord = path + "/" + child; + if (!remove(pathRecord)) { + success = false; + } + } + + if (metrics != null) { + long time = Time.monotonicNow() - start; + if (success) { + metrics.addRemove(time); + } else { + metrics.addFailure(time); + } + } + return success; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java new file mode 100644 index 0000000..7d9ddc6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.io.Files; + +/** + * StateStoreDriver implementation based on a local file. + */ +public class StateStoreFileImpl extends StateStoreFileBaseImpl { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreFileImpl.class); + + /** Configuration keys. */ + public static final String FEDERATION_STORE_FILE_DIRECTORY = + RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.file.directory"; + + /** Root directory for the state store. */ + private String rootDirectory; + + + @Override + protected boolean exists(String path) { + File test = new File(path); + return test.exists(); + } + + @Override + protected boolean mkdir(String path) { + File dir = new File(path); + return dir.mkdirs(); + } + + @Override + protected boolean rename(String src, String dst) { + try { + Files.move(new File(src), new File(dst)); + return true; + } catch (IOException e) { + LOG.error("Cannot rename {} to {}", src, dst, e); + return false; + } + } + + @Override + protected boolean remove(String path) { + File file = new File(path); + return file.delete(); + } + + @Override + protected String getRootDir() { + if (this.rootDirectory == null) { + String dir = getConf().get(FEDERATION_STORE_FILE_DIRECTORY); + if (dir == null) { + File tempDir = Files.createTempDir(); + dir = tempDir.getAbsolutePath(); + LOG.warn("The root directory is not available, using {}", dir); + } + this.rootDirectory = dir; + } + return this.rootDirectory; + } + + @Override + protected <T extends BaseRecord> BufferedReader getReader(String filename) { + BufferedReader reader = null; + try { + LOG.debug("Loading file: {}", filename); + File file = new File(filename); + FileInputStream fis = new FileInputStream(file); + InputStreamReader isr = + new InputStreamReader(fis, StandardCharsets.UTF_8); + reader = new BufferedReader(isr); + } catch (Exception ex) { + LOG.error("Cannot open read stream for record {}", filename, ex); + } + return reader; + } + + @Override + protected <T extends BaseRecord> BufferedWriter getWriter(String filename) { + BufferedWriter writer = null; + try { + LOG.debug("Writing file: {}", filename); + File file = new File(filename); + FileOutputStream fos = new FileOutputStream(file, false); + OutputStreamWriter osw = + new OutputStreamWriter(fos, StandardCharsets.UTF_8); + writer = new BufferedWriter(osw); + } catch (IOException e) { + LOG.error("Cannot open write stream for record {}", filename, e); + } + return writer; + } + + @Override + public void close() throws Exception { + setInitialized(false); + } + + @Override + protected List<String> getChildren(String path) { + List<String> ret = new LinkedList<>(); + File dir = new File(path); + File[] files = dir.listFiles(); + if (files != null) { + for (File file : files) { + String filename = file.getName(); + ret.add(filename); + } + } + return ret; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java new file mode 100644 index 0000000..ad822fb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * StateStoreDriver} implementation based on a filesystem. The most common uses + * HDFS as a backend. + */ +public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreFileSystemImpl.class); + + + /** Configuration keys. */ + public static final String FEDERATION_STORE_FS_PATH = + RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.fs.path"; + + /** File system to back the State Store. */ + private FileSystem fs; + /** Working path in the filesystem. */ + private String workPath; + + @Override + protected boolean exists(String path) { + try { + return fs.exists(new Path(path)); + } catch (IOException e) { + return false; + } + } + + @Override + protected boolean mkdir(String path) { + try { + return fs.mkdirs(new Path(path)); + } catch (IOException e) { + return false; + } + } + + @Override + protected boolean rename(String src, String dst) { + try { + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem dfs = (DistributedFileSystem)fs; + dfs.rename(new Path(src), new Path(dst), Options.Rename.OVERWRITE); + return true; + } else { + // Replace should be atomic but not available + if (fs.exists(new Path(dst))) { + fs.delete(new Path(dst), true); + } + return fs.rename(new Path(src), new Path(dst)); + } + } catch (Exception e) { + LOG.error("Cannot rename {} to {}", src, dst, e); + return false; + } + } + + @Override + protected boolean remove(String path) { + try { + return fs.delete(new Path(path), true); + } catch (Exception e) { + LOG.error("Cannot remove {}", path, e); + return false; + } + } + + @Override + protected String getRootDir() { + if (this.workPath == null) { + String rootPath = getConf().get(FEDERATION_STORE_FS_PATH); + URI workUri; + try { + workUri = new URI(rootPath); + fs = FileSystem.get(workUri, getConf()); + } catch (Exception ex) { + return null; + } + this.workPath = rootPath; + } + return this.workPath; + } + + @Override + public void close() throws Exception { + if (fs != null) { + fs.close(); + } + } + + @Override + protected <T extends BaseRecord> BufferedReader getReader(String pathName) { + BufferedReader reader = null; + Path path = new Path(pathName); + try { + FSDataInputStream fdis = fs.open(path); + InputStreamReader isr = + new InputStreamReader(fdis, StandardCharsets.UTF_8); + reader = new BufferedReader(isr); + } catch (IOException ex) { + LOG.error("Cannot open read stream for {}", path); + } + return reader; + } + + @Override + protected <T extends BaseRecord> BufferedWriter getWriter(String pathName) { + BufferedWriter writer = null; + Path path = new Path(pathName); + try { + FSDataOutputStream fdos = fs.create(path, true); + OutputStreamWriter osw = + new OutputStreamWriter(fdos, StandardCharsets.UTF_8); + writer = new BufferedWriter(osw); + } catch (IOException ex) { + LOG.error("Cannot open write stream for {}", path); + } + return writer; + } + + @Override + protected List<String> getChildren(String pathName) { + List<String> ret = new LinkedList<>(); + Path path = new Path(workPath, pathName); + try { + FileStatus[] files = fs.listStatus(path); + for (FileStatus file : files) { + Path filePath = file.getPath(); + String fileName = filePath.getName(); + ret.add(fileName); + } + } catch (Exception e) { + LOG.error("Cannot get children for {}", pathName, e); + } + return ret; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java new file mode 100644 index 0000000..7bc93de --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; + +/** + * State Store driver that stores a serialization of the records. The serializer + * is pluggable. + */ +public abstract class StateStoreSerializableImpl extends StateStoreBaseImpl { + + /** Mark for slashes in path names. */ + protected static final String SLASH_MARK = "0SLASH0"; + /** Mark for colon in path names. */ + protected static final String COLON_MARK = "_"; + + /** Default serializer for this driver. */ + private StateStoreSerializer serializer; + + + @Override + public boolean init(final Configuration config, final String id, + final Collection<Class<? extends BaseRecord>> records, + final StateStoreMetrics metrics) { + boolean ret = super.init(config, id, records, metrics); + + this.serializer = StateStoreSerializer.getSerializer(config); + + return ret; + } + + /** + * Serialize a record using the serializer. + * @param record Record to serialize. + * @return Byte array with the serialization of the record. + */ + protected <T extends BaseRecord> byte[] serialize(T record) { + return serializer.serialize(record); + } + + /** + * Serialize a record using the serializer. + * @param record Record to serialize. + * @return String with the serialization of the record. + */ + protected <T extends BaseRecord> String serializeString(T record) { + return serializer.serializeString(record); + } + + /** + * Creates a record from an input data string. + * @param data Serialized text of the record. + * @param clazz Record class. + * @param includeDates If dateModified and dateCreated are serialized. + * @return The created record. + * @throws IOException + */ + protected <T extends BaseRecord> T newRecord( + String data, Class<T> clazz, boolean includeDates) throws IOException { + return serializer.deserialize(data, clazz); + } + + /** + * Get the primary key for a record. If we don't want to store in folders, we + * need to remove / from the name. + * + * @param record Record to get the primary key for. + * @return Primary key for the record. + */ + protected static String getPrimaryKey(BaseRecord record) { + String primaryKey = record.getPrimaryKey(); + primaryKey = primaryKey.replaceAll("/", SLASH_MARK); + primaryKey = primaryKey.replaceAll(":", COLON_MARK); + return primaryKey; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java new file mode 100644 index 0000000..45c5dd6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializerPBImpl.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import java.io.IOException; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.codec.binary.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord; +import org.apache.hadoop.util.ReflectionUtils; + +import com.google.protobuf.Message; + +/** + * Protobuf implementation of the State Store serializer. + */ +public final class StateStoreSerializerPBImpl extends StateStoreSerializer { + + private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb"; + private static final String PB_IMPL_CLASS_SUFFIX = "PBImpl"; + + private Configuration localConf = new Configuration(); + + + private StateStoreSerializerPBImpl() { + } + + @Override + @SuppressWarnings("unchecked") + public <T> T newRecordInstance(Class<T> clazz) { + try { + String clazzPBImpl = getPBImplClassName(clazz); + Class<?> pbClazz = localConf.getClassByName(clazzPBImpl); + Object retObject = ReflectionUtils.newInstance(pbClazz, localConf); + return (T)retObject; + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + + private String getPBImplClassName(Class<?> clazz) { + String srcPackagePart = getPackageName(clazz); + String srcClassName = getClassName(clazz); + String destPackagePart = srcPackagePart + "." + PB_IMPL_PACKAGE_SUFFIX; + String destClassPart = srcClassName + PB_IMPL_CLASS_SUFFIX; + return destPackagePart + "." + destClassPart; + } + + private String getClassName(Class<?> clazz) { + String fqName = clazz.getName(); + return (fqName.substring(fqName.lastIndexOf(".") + 1, fqName.length())); + } + + private String getPackageName(Class<?> clazz) { + return clazz.getPackage().getName(); + } + + @Override + public byte[] serialize(BaseRecord record) { + byte[] byteArray64 = null; + if (record instanceof PBRecord) { + PBRecord recordPB = (PBRecord) record; + Message msg = recordPB.getProto(); + byte[] byteArray = msg.toByteArray(); + byteArray64 = Base64.encodeBase64(byteArray, false); + } + return byteArray64; + } + + @Override + public String serializeString(BaseRecord record) { + byte[] byteArray64 = serialize(record); + String base64Encoded = StringUtils.newStringUtf8(byteArray64); + return base64Encoded; + } + + @Override + public <T extends BaseRecord> T deserialize( + byte[] byteArray, Class<T> clazz) throws IOException { + + T record = newRecord(clazz); + if (record instanceof PBRecord) { + PBRecord pbRecord = (PBRecord) record; + byte[] byteArray64 = Base64.encodeBase64(byteArray, false); + String base64Encoded = StringUtils.newStringUtf8(byteArray64); + pbRecord.readInstance(base64Encoded); + } + return record; + } + + @Override + public <T extends BaseRecord> T deserialize(String data, Class<T> clazz) + throws IOException { + byte[] byteArray64 = Base64.decodeBase64(data); + return deserialize(byteArray64, clazz); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java new file mode 100644 index 0000000..cd5372d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java @@ -0,0 +1,328 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple; +import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordName; +import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath; +import static org.apache.hadoop.util.Time.monotonicNow; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; +import org.apache.hadoop.util.curator.ZKCuratorManager; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link StateStoreDriver} driver implementation that uses ZooKeeper as a + * backend. + * <p> + * The structure of the znodes in the ensemble is: + * PARENT_PATH + * |--- MOUNT + * |--- MEMBERSHIP + * |--- REBALANCER + * |--- ROUTERS + */ +public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreZooKeeperImpl.class); + + + /** Configuration keys. */ + public static final String FEDERATION_STORE_ZK_DRIVER_PREFIX = + RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk."; + public static final String FEDERATION_STORE_ZK_PARENT_PATH = + FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path"; + public static final String FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT = + "/hdfs-federation"; + + + /** Directory to store the state store data. */ + private String baseZNode; + + /** Interface to ZooKeeper. */ + private ZKCuratorManager zkManager; + + + @Override + public boolean initDriver() { + LOG.info("Initializing ZooKeeper connection"); + + Configuration conf = getConf(); + baseZNode = conf.get( + FEDERATION_STORE_ZK_PARENT_PATH, + FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT); + try { + this.zkManager = new ZKCuratorManager(conf); + this.zkManager.start(); + } catch (IOException e) { + LOG.error("Cannot initialize the ZK connection", e); + return false; + } + return true; + } + + @Override + public <T extends BaseRecord> boolean initRecordStorage( + String className, Class<T> clazz) { + try { + String checkPath = getNodePath(baseZNode, className); + zkManager.createRootDirRecursively(checkPath); + return true; + } catch (Exception e) { + LOG.error("Cannot initialize ZK node for {}: {}", + className, e.getMessage()); + return false; + } + } + + @Override + public void close() throws Exception { + if (zkManager != null) { + zkManager.close(); + } + } + + @Override + public boolean isDriverReady() { + if (zkManager == null) { + return false; + } + CuratorFramework curator = zkManager.getCurator(); + if (curator == null) { + return false; + } + return curator.getState() == CuratorFrameworkState.STARTED; + } + + @Override + public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) + throws IOException { + verifyDriverReady(); + long start = monotonicNow(); + List<T> ret = new ArrayList<>(); + String znode = getZNodeForClass(clazz); + try { + List<String> children = zkManager.getChildren(znode); + for (String child : children) { + try { + String path = getNodePath(znode, child); + Stat stat = new Stat(); + String data = zkManager.getStringData(path, stat); + boolean corrupted = false; + if (data == null || data.equals("")) { + // All records should have data, otherwise this is corrupted + corrupted = true; + } else { + try { + T record = createRecord(data, stat, clazz); + ret.add(record); + } catch (IOException e) { + LOG.error("Cannot create record type \"{}\" from \"{}\": {}", + clazz.getSimpleName(), data, e.getMessage()); + corrupted = true; + } + } + + if (corrupted) { + LOG.error("Cannot get data for {} at {}, cleaning corrupted data", + child, path); + zkManager.delete(path); + } + } catch (Exception e) { + LOG.error("Cannot get data for {}: {}", child, e.getMessage()); + } + } + } catch (Exception e) { + getMetrics().addFailure(monotonicNow() - start); + String msg = "Cannot get children for \"" + znode + "\": " + + e.getMessage(); + LOG.error(msg); + throw new IOException(msg); + } + long end = monotonicNow(); + getMetrics().addRead(end - start); + return new QueryResult<T>(ret, getTime()); + } + + @Override + public <T extends BaseRecord> boolean putAll( + List<T> records, boolean update, boolean error) throws IOException { + verifyDriverReady(); + if (records.isEmpty()) { + return true; + } + + // All records should be the same + T record0 = records.get(0); + Class<? extends BaseRecord> recordClass = record0.getClass(); + String znode = getZNodeForClass(recordClass); + + long start = monotonicNow(); + boolean status = true; + for (T record : records) { + String primaryKey = getPrimaryKey(record); + String recordZNode = getNodePath(znode, primaryKey); + byte[] data = serialize(record); + if (!writeNode(recordZNode, data, update, error)){ + status = false; + } + } + long end = monotonicNow(); + if (status) { + getMetrics().addWrite(end - start); + } else { + getMetrics().addFailure(end - start); + } + return status; + } + + @Override + public <T extends BaseRecord> int remove( + Class<T> clazz, Query<T> query) throws IOException { + verifyDriverReady(); + if (query == null) { + return 0; + } + + // Read the current data + long start = monotonicNow(); + List<T> records = null; + try { + QueryResult<T> result = get(clazz); + records = result.getRecords(); + } catch (IOException ex) { + LOG.error("Cannot get existing records", ex); + getMetrics().addFailure(monotonicNow() - start); + return 0; + } + + // Check the records to remove + String znode = getZNodeForClass(clazz); + List<T> recordsToRemove = filterMultiple(query, records); + + // Remove the records + int removed = 0; + for (T existingRecord : recordsToRemove) { + LOG.info("Removing \"{}\"", existingRecord); + try { + String primaryKey = getPrimaryKey(existingRecord); + String path = getNodePath(znode, primaryKey); + if (zkManager.delete(path)) { + removed++; + } else { + LOG.error("Did not remove \"{}\"", existingRecord); + } + } catch (Exception e) { + LOG.error("Cannot remove \"{}\"", existingRecord, e); + getMetrics().addFailure(monotonicNow() - start); + } + } + long end = monotonicNow(); + if (removed > 0) { + getMetrics().addRemove(end - start); + } + return removed; + } + + @Override + public <T extends BaseRecord> boolean removeAll(Class<T> clazz) + throws IOException { + long start = monotonicNow(); + boolean status = true; + String znode = getZNodeForClass(clazz); + LOG.info("Deleting all children under {}", znode); + try { + List<String> children = zkManager.getChildren(znode); + for (String child : children) { + String path = getNodePath(znode, child); + LOG.info("Deleting {}", path); + zkManager.delete(path); + } + } catch (Exception e) { + LOG.error("Cannot remove {}: {}", znode, e.getMessage()); + status = false; + } + long time = monotonicNow() - start; + if (status) { + getMetrics().addRemove(time); + } else { + getMetrics().addFailure(time); + } + return status; + } + + private boolean writeNode( + String znode, byte[] bytes, boolean update, boolean error) { + try { + boolean created = zkManager.create(znode); + if (!update && !created && error) { + LOG.info("Cannot write record \"{}\", it already exists", znode); + return false; + } + + // Write data + zkManager.setData(znode, bytes, -1); + return true; + } catch (Exception e) { + LOG.error("Cannot write record \"{}\": {}", znode, e.getMessage()); + } + return false; + } + + /** + * Get the ZNode for a class. + * + * @param clazz Record class to evaluate. + * @return The ZNode for the class. + */ + private <T extends BaseRecord> String getZNodeForClass(Class<T> clazz) { + String className = getRecordName(clazz); + return getNodePath(baseZNode, className); + } + + /** + * Creates a record from a string returned by ZooKeeper. + * + * @param data The data to write. + * @param stat Stat of the data record to create. + * @param clazz The data record type to create. + * @return The created record. + * @throws IOException + */ + private <T extends BaseRecord> T createRecord( + String data, Stat stat, Class<T> clazz) throws IOException { + T record = newRecord(data, clazz, false); + record.setDateCreated(stat.getCtime()); + record.setDateModified(stat.getMtime()); + return record; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/package-info.java new file mode 100644 index 0000000..a18433e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/package-info.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementations of state store data providers/drivers. Each driver is + * responsible for maintaining, querying, updating and deleting persistent data + * records. Data records are defined as subclasses of + * {@link org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord}. + * Each driver implements the + * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver + * StateStoreDriver} interface. + * <p> + * Currently supported drivers: + * <ul> + * <li>{@link StateStoreFileImpl} A file-based data storage backend. + * <li>{@link StateStoreZooKeeperImpl} A zookeeper based data storage backend. + * </ul> + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/package-info.java new file mode 100644 index 0000000..da998b5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/package-info.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The state store uses modular data storage classes derived from + * StateStoreDriver to handle querying, updating and deleting data records. The + * data storage driver is initialized and maintained by the StateStoreService. + * The state store supports fetching all records of a type, filtering by column + * values or fetching a single record by its primary key. + * <p> + * Each data storage backend is required to implement the methods contained in + * the StateStoreDriver interface. These methods allow the querying, updating, + * inserting and deleting of data records into the state store. + */ + +@InterfaceAudience.Private +@InterfaceStability.Evolving + +package org.apache.hadoop.hdfs.server.federation.store.driver; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java new file mode 100644 index 0000000..57b7b61 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.impl; + +import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; +import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of the {@link MembershipStore} State Store API. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class MembershipStoreImpl + extends MembershipStore implements StateStoreCache { + + private static final Logger LOG = + LoggerFactory.getLogger(MembershipStoreImpl.class); + + + /** Reported namespaces that are not decommissioned. */ + private final Set<FederationNamespaceInfo> activeNamespaces; + + /** Namenodes (after evaluating the quorum) that are active in the cluster. */ + private final Map<String, MembershipState> activeRegistrations; + /** Namenode status reports (raw) that were discarded for being too old. */ + private final Map<String, MembershipState> expiredRegistrations; + + /** Lock to access the local memory cache. */ + private final ReadWriteLock cacheReadWriteLock = + new ReentrantReadWriteLock(); + private final Lock cacheReadLock = cacheReadWriteLock.readLock(); + private final Lock cacheWriteLock = cacheReadWriteLock.writeLock(); + + + public MembershipStoreImpl(StateStoreDriver driver) { + super(driver); + + this.activeRegistrations = new HashMap<>(); + this.expiredRegistrations = new HashMap<>(); + this.activeNamespaces = new TreeSet<>(); + } + + @Override + public GetNamenodeRegistrationsResponse getExpiredNamenodeRegistrations( + GetNamenodeRegistrationsRequest request) throws IOException { + + GetNamenodeRegistrationsResponse response = + GetNamenodeRegistrationsResponse.newInstance(); + cacheReadLock.lock(); + try { + Collection<MembershipState> vals = this.expiredRegistrations.values(); + List<MembershipState> copyVals = new ArrayList<>(vals); + response.setNamenodeMemberships(copyVals); + } finally { + cacheReadLock.unlock(); + } + return response; + } + + @Override + public GetNamespaceInfoResponse getNamespaceInfo( + GetNamespaceInfoRequest request) throws IOException { + + Set<FederationNamespaceInfo> namespaces = new HashSet<>(); + try { + cacheReadLock.lock(); + namespaces.addAll(activeNamespaces); + } finally { + cacheReadLock.unlock(); + } + + GetNamespaceInfoResponse response = + GetNamespaceInfoResponse.newInstance(namespaces); + return response; + } + + @Override + public GetNamenodeRegistrationsResponse getNamenodeRegistrations( + final GetNamenodeRegistrationsRequest request) throws IOException { + + // TODO Cache some common queries and sorts + List<MembershipState> ret = null; + + cacheReadLock.lock(); + try { + Collection<MembershipState> registrations = activeRegistrations.values(); + MembershipState partialMembership = request.getPartialMembership(); + if (partialMembership == null) { + ret = new ArrayList<>(registrations); + } else { + Query<MembershipState> query = new Query<>(partialMembership); + ret = filterMultiple(query, registrations); + } + } finally { + cacheReadLock.unlock(); + } + // Sort in ascending update date order + Collections.sort(ret); + + GetNamenodeRegistrationsResponse response = + GetNamenodeRegistrationsResponse.newInstance(ret); + return response; + } + + @Override + public NamenodeHeartbeatResponse namenodeHeartbeat( + NamenodeHeartbeatRequest request) throws IOException { + + MembershipState record = request.getNamenodeMembership(); + String nnId = record.getNamenodeKey(); + MembershipState existingEntry = null; + cacheReadLock.lock(); + try { + existingEntry = this.activeRegistrations.get(nnId); + } finally { + cacheReadLock.unlock(); + } + + if (existingEntry != null) { + if (existingEntry.getState() != record.getState()) { + LOG.info("NN registration state has changed: {} -> {}", + existingEntry, record); + } else { + LOG.debug("Updating NN registration: {} -> {}", existingEntry, record); + } + } else { + LOG.info("Inserting new NN registration: {}", record); + } + + boolean status = getDriver().put(record, true, false); + + NamenodeHeartbeatResponse response = + NamenodeHeartbeatResponse.newInstance(status); + return response; + } + + @Override + public boolean loadCache(boolean force) throws IOException { + super.loadCache(force); + + // Update local cache atomically + cacheWriteLock.lock(); + try { + this.activeRegistrations.clear(); + this.expiredRegistrations.clear(); + this.activeNamespaces.clear(); + + // Build list of NN registrations: nnId -> registration list + Map<String, List<MembershipState>> nnRegistrations = new HashMap<>(); + List<MembershipState> cachedRecords = getCachedRecords(); + for (MembershipState membership : cachedRecords) { + String nnId = membership.getNamenodeKey(); + if (membership.getState() == FederationNamenodeServiceState.EXPIRED) { + // Expired, RPC service does not use these + String key = membership.getPrimaryKey(); + this.expiredRegistrations.put(key, membership); + } else { + // This is a valid NN registration, build a list of all registrations + // using the NN id to use for the quorum calculation. + List<MembershipState> nnRegistration = + nnRegistrations.get(nnId); + if (nnRegistration == null) { + nnRegistration = new LinkedList<>(); + nnRegistrations.put(nnId, nnRegistration); + } + nnRegistration.add(membership); + String bpId = membership.getBlockPoolId(); + String cId = membership.getClusterId(); + String nsId = membership.getNameserviceId(); + FederationNamespaceInfo nsInfo = + new FederationNamespaceInfo(bpId, cId, nsId); + this.activeNamespaces.add(nsInfo); + } + } + + // Calculate most representative entry for each active NN id + for (List<MembershipState> nnRegistration : nnRegistrations.values()) { + // Run quorum based on NN state + MembershipState representativeRecord = + getRepresentativeQuorum(nnRegistration); + String nnKey = representativeRecord.getNamenodeKey(); + this.activeRegistrations.put(nnKey, representativeRecord); + } + LOG.debug("Refreshed {} NN registrations from State Store", + cachedRecords.size()); + } finally { + cacheWriteLock.unlock(); + } + return true; + } + + @Override + public UpdateNamenodeRegistrationResponse updateNamenodeRegistration( + UpdateNamenodeRegistrationRequest request) throws IOException { + + boolean status = false; + cacheWriteLock.lock(); + try { + String namenode = MembershipState.getNamenodeKey( + request.getNameserviceId(), request.getNamenodeId()); + MembershipState member = this.activeRegistrations.get(namenode); + if (member != null) { + member.setState(request.getState()); + status = true; + } + } finally { + cacheWriteLock.unlock(); + } + UpdateNamenodeRegistrationResponse response = + UpdateNamenodeRegistrationResponse.newInstance(status); + return response; + } + + /** + * Picks the most recent entry in the subset that is most agreeable on the + * specified field. 1) If a majority of the collection has the same value for + * the field, the first sorted entry within the subset the matches the + * majority value 2) Otherwise the first sorted entry in the set of all + * entries + * + * @param records - Collection of state store record objects of the same type + * @return record that is most representative of the field name + */ + private MembershipState getRepresentativeQuorum( + Collection<MembershipState> records) { + + // Collate objects by field value: field value -> order set of records + Map<FederationNamenodeServiceState, TreeSet<MembershipState>> occurenceMap = + new HashMap<>(); + for (MembershipState record : records) { + FederationNamenodeServiceState state = record.getState(); + TreeSet<MembershipState> matchingSet = occurenceMap.get(state); + if (matchingSet == null) { + // TreeSet orders elements by descending date via comparators + matchingSet = new TreeSet<>(); + occurenceMap.put(state, matchingSet); + } + matchingSet.add(record); + } + + // Select largest group + TreeSet<MembershipState> largestSet = new TreeSet<>(); + for (TreeSet<MembershipState> matchingSet : occurenceMap.values()) { + if (largestSet.size() < matchingSet.size()) { + largestSet = matchingSet; + } + } + + // If quorum, use the newest element here + if (largestSet.size() > records.size() / 2) { + return largestSet.first(); + // Otherwise, return most recent by class comparator + } else if (records.size() > 0) { + TreeSet<MembershipState> sortedList = new TreeSet<>(records); + LOG.debug("Quorum failed, using most recent: {}", sortedList.first()); + return sortedList.first(); + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java new file mode 100644 index 0000000..eb117d6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.impl; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.hdfs.server.federation.router.RouterAdminServer; +import org.apache.hadoop.hdfs.server.federation.router.RouterPermissionChecker; +import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.util.Time; + +/** + * Implementation of the {@link MountTableStore} state store API. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class MountTableStoreImpl extends MountTableStore { + + public MountTableStoreImpl(StateStoreDriver driver) { + super(driver); + } + + @Override + public AddMountTableEntryResponse addMountTableEntry( + AddMountTableEntryRequest request) throws IOException { + MountTable mountTable = request.getEntry(); + if (mountTable != null) { + RouterPermissionChecker pc = RouterAdminServer.getPermissionChecker(); + if (pc != null) { + pc.checkPermission(mountTable, FsAction.WRITE); + } + } + + boolean status = getDriver().put(mountTable, false, true); + AddMountTableEntryResponse response = + AddMountTableEntryResponse.newInstance(); + response.setStatus(status); + return response; + } + + @Override + public UpdateMountTableEntryResponse updateMountTableEntry( + UpdateMountTableEntryRequest request) throws IOException { + MountTable mountTable = request.getEntry(); + if (mountTable != null) { + RouterPermissionChecker pc = RouterAdminServer.getPermissionChecker(); + if (pc != null) { + pc.checkPermission(mountTable, FsAction.WRITE); + } + } + + boolean status = getDriver().put(mountTable, true, true); + UpdateMountTableEntryResponse response = + UpdateMountTableEntryResponse.newInstance(); + response.setStatus(status); + return response; + } + + @Override + public RemoveMountTableEntryResponse removeMountTableEntry( + RemoveMountTableEntryRequest request) throws IOException { + final String srcPath = request.getSrcPath(); + final MountTable partial = MountTable.newInstance(); + partial.setSourcePath(srcPath); + final Query<MountTable> query = new Query<>(partial); + final MountTable deleteEntry = getDriver().get(getRecordClass(), query); + + boolean status = false; + if (deleteEntry != null) { + RouterPermissionChecker pc = RouterAdminServer.getPermissionChecker(); + if (pc != null) { + pc.checkPermission(deleteEntry, FsAction.WRITE); + } + status = getDriver().remove(deleteEntry); + } + + RemoveMountTableEntryResponse response = + RemoveMountTableEntryResponse.newInstance(); + response.setStatus(status); + return response; + } + + @Override + public GetMountTableEntriesResponse getMountTableEntries( + GetMountTableEntriesRequest request) throws IOException { + RouterPermissionChecker pc = + RouterAdminServer.getPermissionChecker(); + // Get all values from the cache + List<MountTable> records = getCachedRecords(); + + // Sort and filter + Collections.sort(records, MountTable.SOURCE_COMPARATOR); + String reqSrcPath = request.getSrcPath(); + if (reqSrcPath != null && !reqSrcPath.isEmpty()) { + // Return only entries beneath this path + Iterator<MountTable> it = records.iterator(); + while (it.hasNext()) { + MountTable record = it.next(); + String srcPath = record.getSourcePath(); + if (!srcPath.startsWith(reqSrcPath)) { + it.remove(); + } else if (pc != null) { + // do the READ permission check + try { + pc.checkPermission(record, FsAction.READ); + } catch (AccessControlException ignored) { + // Remove this mount table entry if it cannot + // be accessed by current user. + it.remove(); + } + } + } + } + + GetMountTableEntriesResponse response = + GetMountTableEntriesResponse.newInstance(); + response.setEntries(records); + response.setTimestamp(Time.now()); + return response; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/RouterStoreImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/RouterStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/RouterStoreImpl.java new file mode 100644 index 0000000..d58c288 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/RouterStoreImpl.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.impl; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.federation.store.RouterStore; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; + +/** + * Implementation of the {@link RouterStore} state store API. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class RouterStoreImpl extends RouterStore { + + public RouterStoreImpl(StateStoreDriver driver) { + super(driver); + } + + @Override + public GetRouterRegistrationResponse getRouterRegistration( + GetRouterRegistrationRequest request) throws IOException { + + final RouterState partial = RouterState.newInstance(); + partial.setAddress(request.getRouterId()); + final Query<RouterState> query = new Query<RouterState>(partial); + RouterState record = getDriver().get(getRecordClass(), query); + if (record != null) { + overrideExpiredRecord(record); + } + GetRouterRegistrationResponse response = + GetRouterRegistrationResponse.newInstance(); + response.setRouter(record); + return response; + } + + @Override + public GetRouterRegistrationsResponse getRouterRegistrations( + GetRouterRegistrationsRequest request) throws IOException { + + // Get all values from the cache + QueryResult<RouterState> recordsAndTimeStamp = + getCachedRecordsAndTimeStamp(); + List<RouterState> records = recordsAndTimeStamp.getRecords(); + long timestamp = recordsAndTimeStamp.getTimestamp(); + + // Generate response + GetRouterRegistrationsResponse response = + GetRouterRegistrationsResponse.newInstance(); + response.setRouters(records); + response.setTimestamp(timestamp); + return response; + } + + @Override + public RouterHeartbeatResponse routerHeartbeat(RouterHeartbeatRequest request) + throws IOException { + + RouterState record = request.getRouter(); + boolean status = getDriver().put(record, true, false); + RouterHeartbeatResponse response = + RouterHeartbeatResponse.newInstance(status); + return response; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java new file mode 100644 index 0000000..1a50d15 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains implementations of the state store API interfaces. All classes + * derive from {@link + * org.apache.hadoop.hdfs.server.federation.store.RecordStore}. The API + * definitions are contained in the + * org.apache.hadoop.hdfs.server.federation.store package. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +package org.apache.hadoop.hdfs.server.federation.store.impl; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/package-info.java new file mode 100644 index 0000000..949ec7c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/package-info.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The federation state store tracks persistent values that are shared between + * multiple routers. + * <p> + * Data is stored in data records that inherit from a common class. Data records + * are serialized when written to the data store using a modular serialization + * implementation. The default is profobuf serialization. Data is stored as rows + * of records of the same type with each data member in a record representing a + * column. + * <p> + * The state store uses a modular data storage + * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver + * StateStoreDriver} to handle querying, updating and deleting data records. The + * data storage driver is initialized and maintained by the + * {@link org.apache.hadoop.hdfs.server.federation.store. + * StateStoreService FederationStateStoreService}. The state store + * supports fetching all records of a type, filtering by column values or + * fetching a single record by its primary key. + * <p> + * The state store contains several API interfaces, one for each data records + * type. + * <p> + * <ul> + * <li>FederationMembershipStateStore: state of all Namenodes in the federation. + * Uses the MembershipState record. + * <li>FederationMountTableStore: Mount table mapping paths in the global + * namespace to individual subcluster paths. Uses the MountTable record. + * <li>RouterStateStore: State of all routers in the federation. Uses the + * RouterState record. + * </ul> + * <p> + * Each API is defined in a separate interface. The implementations of these + * interfaces are responsible for accessing the + * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver + * StateStoreDriver} to query, update and delete data records. + */ + +@InterfaceAudience.Private +@InterfaceStability.Evolving + +package org.apache.hadoop.hdfs.server.federation.store; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryRequest.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryRequest.java new file mode 100644 index 0000000..2d9f102 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryRequest.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; + +/** + * API request for adding a mount table entry to the state store. + */ +public abstract class AddMountTableEntryRequest { + + public static AddMountTableEntryRequest newInstance() { + return StateStoreSerializer.newRecord(AddMountTableEntryRequest.class); + } + + public static AddMountTableEntryRequest newInstance(MountTable newEntry) { + AddMountTableEntryRequest request = newInstance(); + request.setEntry(newEntry); + return request; + } + + @Public + @Unstable + public abstract MountTable getEntry(); + + @Public + @Unstable + public abstract void setEntry(MountTable mount); +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org