[
https://issues.apache.org/jira/browse/HDFS-16943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708123#comment-17708123
]
ASF GitHub Bot commented on HDFS-16943:
---------------------------------------
virajjasani commented on code in PR #5469:
URL: https://github.com/apache/hadoop/pull/5469#discussion_r1156454197
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java:
##########
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
+import
org.apache.hadoop.hdfs.server.federation.router.security.token.SQLConnectionFactory;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import
org.apache.hadoop.hdfs.server.federation.store.records.DisabledNameservice;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
+import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.*;
+import static org.apache.hadoop.util.Time.*;
Review Comment:
nit: good to avoid `*`
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java:
##########
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
+import
org.apache.hadoop.hdfs.server.federation.router.security.token.SQLConnectionFactory;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import
org.apache.hadoop.hdfs.server.federation.store.records.DisabledNameservice;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
+import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.*;
+import static org.apache.hadoop.util.Time.*;
+
+/**
+ * StateStoreDriver implementation based on MySQL.
+ * There is a separate table for each record type. Each table just as two
+ * columns, recordKey and recordValue.
+ */
+public class StateStoreMySQLImpl extends StateStoreSerializableImpl {
+ public static final String SQL_STATE_STORE_CONF_PREFIX =
"state-store-mysql.";
+ public static final String CONNECTION_URL =
+ SQL_STATE_STORE_CONF_PREFIX + "connection.url";
+ public static final String CONNECTION_USERNAME =
+ SQL_STATE_STORE_CONF_PREFIX + "connection.username";
+ public static final String CONNECTION_PASSWORD =
+ SQL_STATE_STORE_CONF_PREFIX + "connection.password";
+ public static final String CONNECTION_DRIVER =
+ SQL_STATE_STORE_CONF_PREFIX + "connection.driver";
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StateStoreSerializableImpl.class);
+ private SQLConnectionFactory connectionFactory;
+ /** If the driver has been initialized. */
+ private boolean initialized = false;
+ private final static Set<String> VALID_TABLES = Collections.unmodifiableSet(
+ new HashSet<>(Arrays.asList(
+ MembershipState.class.getSimpleName(),
+ RouterState.class.getSimpleName(),
+ MountTable.class.getSimpleName(),
+ DisabledNameservice.class.getSimpleName()
+ ))
+ );
+
+ @Override
+ public boolean initDriver() {
+ Configuration conf = getConf();
+ connectionFactory = new
MySQLStateStoreHikariDataSourceConnectionFactory(conf);
+ initialized = true;
+ LOG.info("MySQL state store connection factory initialized");
+ return true;
+ }
+
+ @Override
+ public <T extends BaseRecord> boolean initRecordStorage(String className,
Class<T> clazz) {
+ String tableName = getAndValidateTableNameForClass(clazz);
+ try (Connection connection = connectionFactory.getConnection();
+ ResultSet resultSet = connection
+ .getMetaData()
+ .getTables(null, null, tableName, null)) {
+ if (resultSet.next()) {
+ return true;
+ }
+ } catch (SQLException e) {
+ LOG.error("Could not check if table {} able exists", tableName);
+ }
+
+ try (Connection connection = connectionFactory.getConnection();
+ Statement statement = connection.createStatement()) {
+ String sql = String.format("CREATE TABLE %s ("
+ + "recordKey VARCHAR (255) NOT NULL,"
+ + "recordValue VARCHAR (2047) NOT NULL, "
+ + "PRIMARY KEY(recordKey))", tableName);
+ statement.execute(sql);
+ return true;
+ } catch (SQLException e) {
+ LOG.error(String.format("Cannot create table %s for record type %s.",
+ tableName, className), e.getMessage());
+ return false;
+ }
+ }
+
+ @Override
+ public boolean isDriverReady() {
+ return this.initialized;
+ }
+
+ @Override
+ public void close() throws Exception {
+ connectionFactory.shutdown();
+ }
+
+ @Override
+ public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
+ throws IOException {
+ String tableName = getAndValidateTableNameForClass(clazz);
+ verifyDriverReady();
+ long start = monotonicNow();
+ StateStoreMetrics metrics = getMetrics();
+ List<T> ret = new ArrayList<>();
+ try (Connection connection = connectionFactory.getConnection();
+ PreparedStatement statement = connection.prepareStatement(
+ String.format("SELECT * FROM %s", tableName))) {
+ try (ResultSet result = statement.executeQuery()) {
+ while(result.next()) {
+ String recordValue = result.getString("recordValue");
+ T record = newRecord(recordValue, clazz, false);
+ ret.add(record);
+ }
+ }
+ } catch (SQLException e) {
+ if (metrics != null) {
+ metrics.addFailure(monotonicNow() - start);
+ }
+ String msg = "Cannot fetch records for " + clazz.getSimpleName();
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
+ }
+
+ if (metrics != null) {
+ metrics.addRead(monotonicNow() - start);
+ }
+ return new QueryResult<>(ret, getTime());
+ }
+
+ @Override
+ public <T extends BaseRecord> boolean putAll(
+ List<T> records, boolean allowUpdate, boolean errorIfExists) throws
IOException {
+ if (records.isEmpty()) {
+ return true;
+ }
+
+ verifyDriverReady();
+ StateStoreMetrics metrics = getMetrics();
+
+ long start = monotonicNow();
+
+ boolean success = true;
+ for (T record : records) {
+ String tableName = getAndValidateTableNameForClass(record.getClass());
+ String primaryKey = getPrimaryKey(record);
+ String data = serializeString(record);
+
+ if (recordExists(tableName, primaryKey)) {
+ if (allowUpdate) {
+ // Update the mod time stamp. Many backends will use their
+ // own timestamp for the mod time.
+ record.setDateModified(this.getTime());
+ if (!updateRecord(tableName, primaryKey, data)) {
+ LOG.error("Cannot write {} into table {}", primaryKey, tableName);
+ success = false;
Review Comment:
Oh this is the reason why we will have to update one record at a time right?
> RBF: Implement MySQL based StateStoreDriver
> -------------------------------------------
>
> Key: HDFS-16943
> URL: https://issues.apache.org/jira/browse/HDFS-16943
> Project: Hadoop HDFS
> Issue Type: Task
> Components: hdfs, rbf
> Reporter: Simbarashe Dzinamarira
> Assignee: Simbarashe Dzinamarira
> Priority: Major
> Labels: pull-request-available
>
> RBF supports two types of StateStoreDrivers
> # StateStoreFileImpl
> # StateStoreZooKeeperImpl
> I propose implementing a third driver that is backed by MySQL.
> * StateStoreZooKeeperImpl requires an additional Zookeeper cluster.
> * StateStoreFileImpl can use one of the namenodes in the HDFS cluster, but
> that namenode becomes a single point of failure, introducing coupling between
> the federated clusters.
> HADOOP-18535 implemented a MySQL token store. When tokens are stored in
> MySQL, using MySQL for the StateStore as well reduces the number of external
> dependencies for routers.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]