xiarixiaoyao commented on code in PR #5064:
URL: https://github.com/apache/hudi/pull/5064#discussion_r885165465


##########
hudi-metastore/src/main/java/org/apache/hudi/metastore/HoodieMetastoreServer.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metastore;
+
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.metastore.service.HoodieMetastoreService;
+import org.apache.hudi.metastore.service.HoodieMetaStoreProxyHandler;
+import org.apache.hudi.metastore.service.PartitionService;
+import org.apache.hudi.metastore.service.SnapshotService;
+import org.apache.hudi.metastore.service.TableService;
+import org.apache.hudi.metastore.service.TimelineService;
+import org.apache.hudi.metastore.store.RelationDBBasedStore;
+import org.apache.hudi.metastore.store.MetadataStore;
+import org.apache.hudi.metastore.thrift.MetaStoreException;
+import org.apache.hudi.metastore.thrift.ThriftHoodieMetastore;
+import org.apache.hudi.metastore.util.TServerSocketWrapper;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerTransport;
+
+import java.lang.reflect.Proxy;
+
+public class HoodieMetastoreServer {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieMetastoreServer.class);
+
+  private static volatile TServer server;
+  private static volatile Thread serverThread;

Review Comment:
   Do we need declare those fields as volatile ?



##########
hudi-metastore/src/main/java/org/apache/hudi/metastore/HoodieMetastoreServer.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metastore;
+
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.metastore.service.HoodieMetastoreService;
+import org.apache.hudi.metastore.service.HoodieMetaStoreProxyHandler;
+import org.apache.hudi.metastore.service.PartitionService;
+import org.apache.hudi.metastore.service.SnapshotService;
+import org.apache.hudi.metastore.service.TableService;
+import org.apache.hudi.metastore.service.TimelineService;
+import org.apache.hudi.metastore.store.RelationDBBasedStore;
+import org.apache.hudi.metastore.store.MetadataStore;
+import org.apache.hudi.metastore.thrift.MetaStoreException;
+import org.apache.hudi.metastore.thrift.ThriftHoodieMetastore;
+import org.apache.hudi.metastore.util.TServerSocketWrapper;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerTransport;
+
+import java.lang.reflect.Proxy;
+
+public class HoodieMetastoreServer {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieMetastoreServer.class);
+
+  private static volatile TServer server;
+  private static volatile Thread serverThread;

Review Comment:
   Do we need declare those fields as volatile ?



##########
hudi-metastore/src/main/java/org/apache/hudi/metastore/HoodieMetastoreServer.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metastore;
+
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.metastore.service.HoodieMetastoreService;
+import org.apache.hudi.metastore.service.HoodieMetaStoreProxyHandler;
+import org.apache.hudi.metastore.service.PartitionService;
+import org.apache.hudi.metastore.service.SnapshotService;
+import org.apache.hudi.metastore.service.TableService;
+import org.apache.hudi.metastore.service.TimelineService;
+import org.apache.hudi.metastore.store.RelationDBBasedStore;
+import org.apache.hudi.metastore.store.MetadataStore;
+import org.apache.hudi.metastore.thrift.MetaStoreException;
+import org.apache.hudi.metastore.thrift.ThriftHoodieMetastore;
+import org.apache.hudi.metastore.util.TServerSocketWrapper;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerTransport;
+
+import java.lang.reflect.Proxy;
+
+public class HoodieMetastoreServer {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieMetastoreServer.class);
+
+  private static volatile TServer server;
+  private static volatile Thread serverThread;
+  private static volatile MetadataStore metadataStore;
+  private static volatile HoodieMetastoreService metastoreService;
+
+  public static void main(String[] args) {
+    startServer();
+  }
+
+  public static void startServer() {
+    try {
+      if (server != null) {
+        return;
+      }
+      metadataStore = new RelationDBBasedStore();
+      // service
+      TableService tableService = new TableService(metadataStore);
+      PartitionService partitionService = new PartitionService(metadataStore);
+      TimelineService timelineService = new TimelineService(metadataStore);
+      SnapshotService snapshotService = new SnapshotService(metadataStore);
+      HoodieMetastoreService hoodieMetastoreService = new 
HoodieMetastoreService(tableService,
+          partitionService, timelineService, snapshotService);
+      HoodieMetaStoreProxyHandler proxyHandler = new 
HoodieMetaStoreProxyHandler(hoodieMetastoreService);
+
+      // start a thrift server
+      ThriftHoodieMetastore.Iface proxy = (ThriftHoodieMetastore.Iface) Proxy
+          .newProxyInstance(HoodieMetaStoreProxyHandler.class.getClassLoader(),
+              new Class[]{ThriftHoodieMetastore.Iface.class}, proxyHandler);
+      ThriftHoodieMetastore.Processor processor = new 
ThriftHoodieMetastore.Processor(proxy);
+      TServerTransport serverTransport = new TServerSocketWrapper(9090);

Review Comment:
   Let's make the port configurable



##########
hudi-metastore/src/main/java/org/apache/hudi/metastore/HoodieMetastoreServer.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metastore;
+
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.metastore.service.HoodieMetastoreService;
+import org.apache.hudi.metastore.service.HoodieMetaStoreProxyHandler;
+import org.apache.hudi.metastore.service.PartitionService;
+import org.apache.hudi.metastore.service.SnapshotService;
+import org.apache.hudi.metastore.service.TableService;
+import org.apache.hudi.metastore.service.TimelineService;
+import org.apache.hudi.metastore.store.RelationDBBasedStore;
+import org.apache.hudi.metastore.store.MetadataStore;
+import org.apache.hudi.metastore.thrift.MetaStoreException;
+import org.apache.hudi.metastore.thrift.ThriftHoodieMetastore;
+import org.apache.hudi.metastore.util.TServerSocketWrapper;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerTransport;
+
+import java.lang.reflect.Proxy;
+
+public class HoodieMetastoreServer {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieMetastoreServer.class);
+
+  private static volatile TServer server;
+  private static volatile Thread serverThread;
+  private static volatile MetadataStore metadataStore;
+  private static volatile HoodieMetastoreService metastoreService;
+
+  public static void main(String[] args) {
+    startServer();
+  }
+
+  public static void startServer() {
+    try {
+      if (server != null) {
+        return;
+      }
+      metadataStore = new RelationDBBasedStore();
+      // service
+      TableService tableService = new TableService(metadataStore);
+      PartitionService partitionService = new PartitionService(metadataStore);
+      TimelineService timelineService = new TimelineService(metadataStore);
+      SnapshotService snapshotService = new SnapshotService(metadataStore);
+      HoodieMetastoreService hoodieMetastoreService = new 
HoodieMetastoreService(tableService,
+          partitionService, timelineService, snapshotService);
+      HoodieMetaStoreProxyHandler proxyHandler = new 
HoodieMetaStoreProxyHandler(hoodieMetastoreService);
+
+      // start a thrift server
+      ThriftHoodieMetastore.Iface proxy = (ThriftHoodieMetastore.Iface) Proxy
+          .newProxyInstance(HoodieMetaStoreProxyHandler.class.getClassLoader(),
+              new Class[]{ThriftHoodieMetastore.Iface.class}, proxyHandler);
+      ThriftHoodieMetastore.Processor processor = new 
ThriftHoodieMetastore.Processor(proxy);
+      TServerTransport serverTransport = new TServerSocketWrapper(9090);
+      server = new TThreadPoolServer(new 
TThreadPoolServer.Args(serverTransport).processor(processor));
+      LOG.info("Starting the server");
+      serverThread = new Thread(() -> server.serve());
+      serverThread.start();
+    } catch (Exception e) {
+      LOG.error("Fail to start the server", e);
+      System.exit(1);
+    }
+  }
+
+  public static ThriftHoodieMetastore.Iface getEmbeddedMetastore() {
+    if (metadataStore == null) {
+      synchronized (HoodieMetastoreServer.class) {
+        if (metadataStore == null) {
+          // todo: add metastore factory.

Review Comment:
   todo -> TODO



##########
hudi-metastore/src/main/java/org/apache/hudi/metastore/HoodieMetastoreServer.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metastore;
+
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.metastore.service.HoodieMetastoreService;
+import org.apache.hudi.metastore.service.HoodieMetaStoreProxyHandler;
+import org.apache.hudi.metastore.service.PartitionService;
+import org.apache.hudi.metastore.service.SnapshotService;
+import org.apache.hudi.metastore.service.TableService;
+import org.apache.hudi.metastore.service.TimelineService;
+import org.apache.hudi.metastore.store.RelationDBBasedStore;
+import org.apache.hudi.metastore.store.MetadataStore;
+import org.apache.hudi.metastore.thrift.MetaStoreException;
+import org.apache.hudi.metastore.thrift.ThriftHoodieMetastore;
+import org.apache.hudi.metastore.util.TServerSocketWrapper;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerTransport;
+
+import java.lang.reflect.Proxy;
+
+public class HoodieMetastoreServer {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieMetastoreServer.class);
+
+  private static volatile TServer server;
+  private static volatile Thread serverThread;
+  private static volatile MetadataStore metadataStore;
+  private static volatile HoodieMetastoreService metastoreService;
+
+  public static void main(String[] args) {
+    startServer();
+  }
+
+  public static void startServer() {
+    try {
+      if (server != null) {
+        return;
+      }
+      metadataStore = new RelationDBBasedStore();
+      // service
+      TableService tableService = new TableService(metadataStore);
+      PartitionService partitionService = new PartitionService(metadataStore);
+      TimelineService timelineService = new TimelineService(metadataStore);
+      SnapshotService snapshotService = new SnapshotService(metadataStore);
+      HoodieMetastoreService hoodieMetastoreService = new 
HoodieMetastoreService(tableService,
+          partitionService, timelineService, snapshotService);
+      HoodieMetaStoreProxyHandler proxyHandler = new 
HoodieMetaStoreProxyHandler(hoodieMetastoreService);
+
+      // start a thrift server
+      ThriftHoodieMetastore.Iface proxy = (ThriftHoodieMetastore.Iface) Proxy
+          .newProxyInstance(HoodieMetaStoreProxyHandler.class.getClassLoader(),
+              new Class[]{ThriftHoodieMetastore.Iface.class}, proxyHandler);
+      ThriftHoodieMetastore.Processor processor = new 
ThriftHoodieMetastore.Processor(proxy);
+      TServerTransport serverTransport = new TServerSocketWrapper(9090);

Review Comment:
   Let's make the port configurable



##########
hudi-metastore/src/main/java/org/apache/hudi/metastore/client/HoodieMetastoreClientImp.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metastore.client;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hudi.common.config.HoodieMetastoreConfig;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metastore.HoodieMetastoreServer;
+import org.apache.hudi.metastore.thrift.Table;
+import org.apache.hudi.metastore.thrift.ThriftHoodieMetastore;
+import org.apache.hudi.metastore.util.EntryConvertor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class HoodieMetastoreClientImp implements HoodieMetastoreClient, 
Serializable {
+
+  private static final Logger LOG =  
LogManager.getLogger(HoodieMetastoreClientImp.class);
+  private final HoodieMetastoreConfig config;
+  private final int retryLimit;
+  private final int retryDelaySeconds;
+  private boolean isConnected;
+  private boolean isLocal;
+  private ThriftHoodieMetastore.Iface client;
+  private TTransport transport;
+
+  public HoodieMetastoreClientImp(HoodieMetastoreConfig config) {
+    this.config = config;
+    this.retryLimit = config.getConnectionRetryLimit();
+    this.retryDelaySeconds = config.getConnectionRetryDelay();
+    String uri = config.getMetastoreUris();
+    if (isLocalEmbeddedMetastore(uri)) {
+      this.client = HoodieMetastoreServer.getEmbeddedMetastore();
+      this.isConnected = true;
+      this.isLocal = true;
+    } else {
+      open();
+    }
+  }
+
+  private void open() {

Review Comment:
   we should throw exception for this function



##########
hudi-metastore/src/main/java/org/apache/hudi/metastore/client/HoodieMetastoreClientImp.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metastore.client;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hudi.common.config.HoodieMetastoreConfig;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metastore.HoodieMetastoreServer;
+import org.apache.hudi.metastore.thrift.Table;
+import org.apache.hudi.metastore.thrift.ThriftHoodieMetastore;
+import org.apache.hudi.metastore.util.EntryConvertor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class HoodieMetastoreClientImp implements HoodieMetastoreClient, 
Serializable {
+
+  private static final Logger LOG =  
LogManager.getLogger(HoodieMetastoreClientImp.class);
+  private final HoodieMetastoreConfig config;
+  private final int retryLimit;
+  private final int retryDelaySeconds;
+  private boolean isConnected;
+  private boolean isLocal;
+  private ThriftHoodieMetastore.Iface client;
+  private TTransport transport;
+
+  public HoodieMetastoreClientImp(HoodieMetastoreConfig config) {
+    this.config = config;
+    this.retryLimit = config.getConnectionRetryLimit();
+    this.retryDelaySeconds = config.getConnectionRetryDelay();
+    String uri = config.getMetastoreUris();
+    if (isLocalEmbeddedMetastore(uri)) {
+      this.client = HoodieMetastoreServer.getEmbeddedMetastore();
+      this.isConnected = true;
+      this.isLocal = true;
+    } else {
+      open();
+    }
+  }
+
+  private void open() {
+    String uri = config.getMetastoreUris();
+    TTransportException exception = null;
+    for (int i = 0; i < retryLimit; i++) {

Review Comment:
   for (int 1= 0; !isConnected && 1< retryLimit; 1++)



##########
hudi-metastore/src/main/java/org/apache/hudi/metastore/client/HoodieMetastoreClientImp.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metastore.client;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hudi.common.config.HoodieMetastoreConfig;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metastore.HoodieMetastoreServer;
+import org.apache.hudi.metastore.thrift.Table;
+import org.apache.hudi.metastore.thrift.ThriftHoodieMetastore;
+import org.apache.hudi.metastore.util.EntryConvertor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class HoodieMetastoreClientImp implements HoodieMetastoreClient, 
Serializable {
+
+  private static final Logger LOG =  
LogManager.getLogger(HoodieMetastoreClientImp.class);
+  private final HoodieMetastoreConfig config;
+  private final int retryLimit;
+  private final int retryDelaySeconds;
+  private boolean isConnected;
+  private boolean isLocal;
+  private ThriftHoodieMetastore.Iface client;
+  private TTransport transport;
+
+  public HoodieMetastoreClientImp(HoodieMetastoreConfig config) {
+    this.config = config;
+    this.retryLimit = config.getConnectionRetryLimit();
+    this.retryDelaySeconds = config.getConnectionRetryDelay();
+    String uri = config.getMetastoreUris();
+    if (isLocalEmbeddedMetastore(uri)) {
+      this.client = HoodieMetastoreServer.getEmbeddedMetastore();
+      this.isConnected = true;
+      this.isLocal = true;
+    } else {
+      open();
+    }
+  }
+
+  private void open() {
+    String uri = config.getMetastoreUris();
+    TTransportException exception = null;
+    for (int i = 0; i < retryLimit; i++) {

Review Comment:
   for (int 1= 0; !isConnected && 1< retryLimit; 1++)



##########
hudi-metastore/src/main/java/org/apache/hudi/metastore/client/HoodieMetastoreClientImp.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metastore.client;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hudi.common.config.HoodieMetastoreConfig;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metastore.HoodieMetastoreServer;
+import org.apache.hudi.metastore.thrift.Table;
+import org.apache.hudi.metastore.thrift.ThriftHoodieMetastore;
+import org.apache.hudi.metastore.util.EntryConvertor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class HoodieMetastoreClientImp implements HoodieMetastoreClient, 
Serializable {
+
+  private static final Logger LOG =  
LogManager.getLogger(HoodieMetastoreClientImp.class);
+  private final HoodieMetastoreConfig config;
+  private final int retryLimit;
+  private final int retryDelaySeconds;
+  private boolean isConnected;
+  private boolean isLocal;
+  private ThriftHoodieMetastore.Iface client;
+  private TTransport transport;
+
+  public HoodieMetastoreClientImp(HoodieMetastoreConfig config) {
+    this.config = config;
+    this.retryLimit = config.getConnectionRetryLimit();
+    this.retryDelaySeconds = config.getConnectionRetryDelay();
+    String uri = config.getMetastoreUris();
+    if (isLocalEmbeddedMetastore(uri)) {
+      this.client = HoodieMetastoreServer.getEmbeddedMetastore();
+      this.isConnected = true;
+      this.isLocal = true;
+    } else {
+      open();
+    }
+  }
+
+  private void open() {
+    String uri = config.getMetastoreUris();
+    TTransportException exception = null;
+    for (int i = 0; i < retryLimit; i++) {
+      try {
+        URI msUri = new URI(uri);
+        this.transport = new TSocket(msUri.getHost(), msUri.getPort());
+        this.client = new ThriftHoodieMetastore.Client(new 
TBinaryProtocol(transport));
+        transport.open();
+        this.isConnected = true;
+        LOG.info("Connected to metastore: " + msUri);
+      } catch (URISyntaxException e) {
+        throw new HoodieException("Invalid metastore uri: " + uri, e);
+      } catch (TTransportException e) {
+        exception = e;
+        LOG.warn("Fail to connect to the metastore.", e);
+      }
+    }
+    if (!isConnected) {
+      throw new HoodieException("Fail to connect to the metastore.", 
exception);
+    }
+  }
+
+  private boolean isLocalEmbeddedMetastore(String uri) {
+    return uri == null ? true : uri.trim().isEmpty();
+  }
+
+  private void reconnect() {
+    this.close();
+  }
+
+  @Override
+  public Table getTable(String db, String tb) {
+    return exceptionWrapper(() -> this.client.get_table(db, tb)).get();
+  }
+
+  @Override
+  public void createTable(Table table) {
+    try {
+      this.client.create_table(table);
+    } catch (TException e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  public List<HoodieInstant> listInstants(String db, String tb, int commitNum) 
{
+    return exceptionWrapper(() -> this.client.list_instants(db, tb, 
commitNum).stream()
+        .map(EntryConvertor::fromTHoodieInstant)
+        .collect(Collectors.toList())).get();
+  }
+
+  public Option<byte[]> getInstantMeta(String db, String tb, HoodieInstant 
instant) {
+    ByteBuffer bytes = exceptionWrapper(() -> this.client.get_instant_meta(db, 
tb, EntryConvertor.toTHoodieInstant(instant))).get();
+    Option<byte[]> res = bytes.capacity() == 0 ? Option.empty() : 
Option.of(bytes.array());
+    return res;
+  }
+
+  public String createNewTimestamp(String db, String tb) {
+    return exceptionWrapper(() -> this.client.create_new_instant_time(db, 
tb)).get();
+  }
+
+  public void createNewInstant(String db, String tb, HoodieInstant instant, 
Option<byte[]> content) {
+    exceptionWrapper(() -> this.client.create_new_instant_with_time(db, tb, 
EntryConvertor.toTHoodieInstant(instant), getByteBuffer(content))).get();
+  }
+
+  public void transitionInstantState(String db, String tb, HoodieInstant 
fromInstant, HoodieInstant toInstant, Option<byte[]> content) {
+    exceptionWrapper(() -> this.client.transition_instant_state(db, tb,
+        EntryConvertor.toTHoodieInstant(fromInstant),
+        EntryConvertor.toTHoodieInstant(toInstant),
+        getByteBuffer(content))).get();
+  }
+
+  public void deleteInstant(String db, String tb, HoodieInstant instant) {
+    exceptionWrapper(() -> this.client.delete_instant(db, tb, 
EntryConvertor.toTHoodieInstant(instant))).get();
+  }
+
+  public FileStatus[] listFilesInPartition(String db, String tb, String 
partition, String timestamp) {
+    return null;

Review Comment:
   pls add TODO , and delete line 149



##########
hudi-metastore/src/main/java/org/apache/hudi/metastore/client/HoodieMetastoreClientImp.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metastore.client;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hudi.common.config.HoodieMetastoreConfig;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metastore.HoodieMetastoreServer;
+import org.apache.hudi.metastore.thrift.Table;
+import org.apache.hudi.metastore.thrift.ThriftHoodieMetastore;
+import org.apache.hudi.metastore.util.EntryConvertor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class HoodieMetastoreClientImp implements HoodieMetastoreClient, 
Serializable {
+
+  private static final Logger LOG =  
LogManager.getLogger(HoodieMetastoreClientImp.class);
+  private final HoodieMetastoreConfig config;
+  private final int retryLimit;
+  private final int retryDelaySeconds;
+  private boolean isConnected;
+  private boolean isLocal;
+  private ThriftHoodieMetastore.Iface client;
+  private TTransport transport;
+
+  public HoodieMetastoreClientImp(HoodieMetastoreConfig config) {
+    this.config = config;
+    this.retryLimit = config.getConnectionRetryLimit();
+    this.retryDelaySeconds = config.getConnectionRetryDelay();
+    String uri = config.getMetastoreUris();
+    if (isLocalEmbeddedMetastore(uri)) {
+      this.client = HoodieMetastoreServer.getEmbeddedMetastore();
+      this.isConnected = true;
+      this.isLocal = true;
+    } else {
+      open();
+    }
+  }
+
+  private void open() {
+    String uri = config.getMetastoreUris();
+    TTransportException exception = null;
+    for (int i = 0; i < retryLimit; i++) {
+      try {
+        URI msUri = new URI(uri);
+        this.transport = new TSocket(msUri.getHost(), msUri.getPort());
+        this.client = new ThriftHoodieMetastore.Client(new 
TBinaryProtocol(transport));
+        transport.open();
+        this.isConnected = true;
+        LOG.info("Connected to metastore: " + msUri);
+      } catch (URISyntaxException e) {
+        throw new HoodieException("Invalid metastore uri: " + uri, e);
+      } catch (TTransportException e) {
+        exception = e;
+        LOG.warn("Fail to connect to the metastore.", e);
+      }
+    }
+    if (!isConnected) {
+      throw new HoodieException("Fail to connect to the metastore.", 
exception);
+    }
+  }
+
+  private boolean isLocalEmbeddedMetastore(String uri) {
+    return uri == null ? true : uri.trim().isEmpty();
+  }
+
+  private void reconnect() {
+    this.close();

Review Comment:
   why only call close()?     forget to call open() ?



##########
hudi-metastore/src/main/java/org/apache/hudi/metastore/client/HoodieMetastoreClientImp.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metastore.client;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hudi.common.config.HoodieMetastoreConfig;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metastore.HoodieMetastoreServer;
+import org.apache.hudi.metastore.thrift.Table;
+import org.apache.hudi.metastore.thrift.ThriftHoodieMetastore;
+import org.apache.hudi.metastore.util.EntryConvertor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class HoodieMetastoreClientImp implements HoodieMetastoreClient, 
Serializable {
+
+  private static final Logger LOG =  
LogManager.getLogger(HoodieMetastoreClientImp.class);
+  private final HoodieMetastoreConfig config;
+  private final int retryLimit;
+  private final int retryDelaySeconds;
+  private boolean isConnected;
+  private boolean isLocal;
+  private ThriftHoodieMetastore.Iface client;
+  private TTransport transport;
+
+  public HoodieMetastoreClientImp(HoodieMetastoreConfig config) {
+    this.config = config;
+    this.retryLimit = config.getConnectionRetryLimit();
+    this.retryDelaySeconds = config.getConnectionRetryDelay();
+    String uri = config.getMetastoreUris();
+    if (isLocalEmbeddedMetastore(uri)) {
+      this.client = HoodieMetastoreServer.getEmbeddedMetastore();
+      this.isConnected = true;
+      this.isLocal = true;
+    } else {
+      open();
+    }
+  }
+
+  private void open() {
+    String uri = config.getMetastoreUris();
+    TTransportException exception = null;
+    for (int i = 0; i < retryLimit; i++) {
+      try {
+        URI msUri = new URI(uri);
+        this.transport = new TSocket(msUri.getHost(), msUri.getPort());
+        this.client = new ThriftHoodieMetastore.Client(new 
TBinaryProtocol(transport));
+        transport.open();
+        this.isConnected = true;
+        LOG.info("Connected to metastore: " + msUri);
+      } catch (URISyntaxException e) {
+        throw new HoodieException("Invalid metastore uri: " + uri, e);
+      } catch (TTransportException e) {
+        exception = e;
+        LOG.warn("Fail to connect to the metastore.", e);
+      }
+    }
+    if (!isConnected) {
+      throw new HoodieException("Fail to connect to the metastore.", 
exception);
+    }
+  }
+
+  private boolean isLocalEmbeddedMetastore(String uri) {
+    return uri == null ? true : uri.trim().isEmpty();
+  }
+
+  private void reconnect() {
+    this.close();

Review Comment:
   why only call close()?     forget to call open() ?



##########
hudi-metastore/src/main/java/org/apache/hudi/metastore/client/HoodieMetastoreClientImp.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metastore.client;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hudi.common.config.HoodieMetastoreConfig;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metastore.HoodieMetastoreServer;
+import org.apache.hudi.metastore.thrift.Table;
+import org.apache.hudi.metastore.thrift.ThriftHoodieMetastore;
+import org.apache.hudi.metastore.util.EntryConvertor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class HoodieMetastoreClientImp implements HoodieMetastoreClient, 
Serializable {
+
+  private static final Logger LOG =  
LogManager.getLogger(HoodieMetastoreClientImp.class);
+  private final HoodieMetastoreConfig config;
+  private final int retryLimit;
+  private final int retryDelaySeconds;
+  private boolean isConnected;
+  private boolean isLocal;
+  private ThriftHoodieMetastore.Iface client;
+  private TTransport transport;
+
+  public HoodieMetastoreClientImp(HoodieMetastoreConfig config) {
+    this.config = config;
+    this.retryLimit = config.getConnectionRetryLimit();
+    this.retryDelaySeconds = config.getConnectionRetryDelay();
+    String uri = config.getMetastoreUris();
+    if (isLocalEmbeddedMetastore(uri)) {
+      this.client = HoodieMetastoreServer.getEmbeddedMetastore();
+      this.isConnected = true;
+      this.isLocal = true;
+    } else {
+      open();
+    }
+  }
+
+  private void open() {
+    String uri = config.getMetastoreUris();
+    TTransportException exception = null;
+    for (int i = 0; i < retryLimit; i++) {
+      try {
+        URI msUri = new URI(uri);
+        this.transport = new TSocket(msUri.getHost(), msUri.getPort());
+        this.client = new ThriftHoodieMetastore.Client(new 
TBinaryProtocol(transport));
+        transport.open();
+        this.isConnected = true;
+        LOG.info("Connected to metastore: " + msUri);
+      } catch (URISyntaxException e) {
+        throw new HoodieException("Invalid metastore uri: " + uri, e);
+      } catch (TTransportException e) {
+        exception = e;
+        LOG.warn("Fail to connect to the metastore.", e);
+      }
+    }
+    if (!isConnected) {
+      throw new HoodieException("Fail to connect to the metastore.", 
exception);
+    }
+  }
+
+  private boolean isLocalEmbeddedMetastore(String uri) {
+    return uri == null ? true : uri.trim().isEmpty();
+  }
+
+  private void reconnect() {
+    this.close();
+  }
+
+  @Override
+  public Table getTable(String db, String tb) {
+    return exceptionWrapper(() -> this.client.get_table(db, tb)).get();
+  }
+
+  @Override
+  public void createTable(Table table) {
+    try {
+      this.client.create_table(table);
+    } catch (TException e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  public List<HoodieInstant> listInstants(String db, String tb, int commitNum) 
{
+    return exceptionWrapper(() -> this.client.list_instants(db, tb, 
commitNum).stream()
+        .map(EntryConvertor::fromTHoodieInstant)
+        .collect(Collectors.toList())).get();
+  }
+
+  public Option<byte[]> getInstantMeta(String db, String tb, HoodieInstant 
instant) {
+    ByteBuffer bytes = exceptionWrapper(() -> this.client.get_instant_meta(db, 
tb, EntryConvertor.toTHoodieInstant(instant))).get();
+    Option<byte[]> res = bytes.capacity() == 0 ? Option.empty() : 
Option.of(bytes.array());
+    return res;
+  }
+
+  public String createNewTimestamp(String db, String tb) {
+    return exceptionWrapper(() -> this.client.create_new_instant_time(db, 
tb)).get();
+  }
+
+  public void createNewInstant(String db, String tb, HoodieInstant instant, 
Option<byte[]> content) {
+    exceptionWrapper(() -> this.client.create_new_instant_with_time(db, tb, 
EntryConvertor.toTHoodieInstant(instant), getByteBuffer(content))).get();
+  }
+
+  public void transitionInstantState(String db, String tb, HoodieInstant 
fromInstant, HoodieInstant toInstant, Option<byte[]> content) {
+    exceptionWrapper(() -> this.client.transition_instant_state(db, tb,
+        EntryConvertor.toTHoodieInstant(fromInstant),
+        EntryConvertor.toTHoodieInstant(toInstant),
+        getByteBuffer(content))).get();
+  }
+
+  public void deleteInstant(String db, String tb, HoodieInstant instant) {
+    exceptionWrapper(() -> this.client.delete_instant(db, tb, 
EntryConvertor.toTHoodieInstant(instant))).get();
+  }
+
+  public FileStatus[] listFilesInPartition(String db, String tb, String 
partition, String timestamp) {
+    return null;

Review Comment:
   pls add TODO 



##########
hudi-metastore/src/main/java/org/apache/hudi/metastore/HoodieMetastoreServer.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metastore;
+
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.metastore.service.HoodieMetastoreService;
+import org.apache.hudi.metastore.service.HoodieMetaStoreProxyHandler;
+import org.apache.hudi.metastore.service.PartitionService;
+import org.apache.hudi.metastore.service.SnapshotService;
+import org.apache.hudi.metastore.service.TableService;
+import org.apache.hudi.metastore.service.TimelineService;
+import org.apache.hudi.metastore.store.RelationDBBasedStore;
+import org.apache.hudi.metastore.store.MetadataStore;
+import org.apache.hudi.metastore.thrift.MetaStoreException;
+import org.apache.hudi.metastore.thrift.ThriftHoodieMetastore;
+import org.apache.hudi.metastore.util.TServerSocketWrapper;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerTransport;
+
+import java.lang.reflect.Proxy;
+
+public class HoodieMetastoreServer {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieMetastoreServer.class);
+
+  private static volatile TServer server;
+  private static volatile Thread serverThread;
+  private static volatile MetadataStore metadataStore;
+  private static volatile HoodieMetastoreService metastoreService;
+
+  public static void main(String[] args) {
+    startServer();
+  }
+
+  public static void startServer() {
+    try {
+      if (server != null) {
+        return;
+      }
+      metadataStore = new RelationDBBasedStore();
+      // service
+      TableService tableService = new TableService(metadataStore);
+      PartitionService partitionService = new PartitionService(metadataStore);
+      TimelineService timelineService = new TimelineService(metadataStore);
+      SnapshotService snapshotService = new SnapshotService(metadataStore);
+      HoodieMetastoreService hoodieMetastoreService = new 
HoodieMetastoreService(tableService,
+          partitionService, timelineService, snapshotService);
+      HoodieMetaStoreProxyHandler proxyHandler = new 
HoodieMetaStoreProxyHandler(hoodieMetastoreService);
+
+      // start a thrift server
+      ThriftHoodieMetastore.Iface proxy = (ThriftHoodieMetastore.Iface) Proxy
+          .newProxyInstance(HoodieMetaStoreProxyHandler.class.getClassLoader(),
+              new Class[]{ThriftHoodieMetastore.Iface.class}, proxyHandler);
+      ThriftHoodieMetastore.Processor processor = new 
ThriftHoodieMetastore.Processor(proxy);
+      TServerTransport serverTransport = new TServerSocketWrapper(9090);
+      server = new TThreadPoolServer(new 
TThreadPoolServer.Args(serverTransport).processor(processor));
+      LOG.info("Starting the server");
+      serverThread = new Thread(() -> server.serve());
+      serverThread.start();
+    } catch (Exception e) {
+      LOG.error("Fail to start the server", e);
+      System.exit(1);
+    }
+  }
+
+  public static ThriftHoodieMetastore.Iface getEmbeddedMetastore() {
+    if (metadataStore == null) {
+      synchronized (HoodieMetastoreServer.class) {
+        if (metadataStore == null) {
+          // todo: add metastore factory.

Review Comment:
   todo -> TODO



##########
hudi-metastore/src/main/java/org/apache/hudi/metastore/client/HoodieMetastoreClientImp.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metastore.client;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hudi.common.config.HoodieMetastoreConfig;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metastore.HoodieMetastoreServer;
+import org.apache.hudi.metastore.thrift.Table;
+import org.apache.hudi.metastore.thrift.ThriftHoodieMetastore;
+import org.apache.hudi.metastore.util.EntryConvertor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class HoodieMetastoreClientImp implements HoodieMetastoreClient, 
Serializable {
+
+  private static final Logger LOG =  
LogManager.getLogger(HoodieMetastoreClientImp.class);
+  private final HoodieMetastoreConfig config;
+  private final int retryLimit;
+  private final int retryDelaySeconds;
+  private boolean isConnected;
+  private boolean isLocal;
+  private ThriftHoodieMetastore.Iface client;
+  private TTransport transport;
+
+  public HoodieMetastoreClientImp(HoodieMetastoreConfig config) {
+    this.config = config;
+    this.retryLimit = config.getConnectionRetryLimit();
+    this.retryDelaySeconds = config.getConnectionRetryDelay();
+    String uri = config.getMetastoreUris();
+    if (isLocalEmbeddedMetastore(uri)) {
+      this.client = HoodieMetastoreServer.getEmbeddedMetastore();
+      this.isConnected = true;
+      this.isLocal = true;
+    } else {
+      open();
+    }
+  }
+
+  private void open() {

Review Comment:
   it will be better to throw exception for this function



##########
hudi-metastore/src/main/java/org/apache/hudi/metastore/client/RetryingHoodieMetastoreClient.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metastore.client;
+
+import org.apache.hudi.common.config.HoodieMetastoreConfig;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.UndeclaredThrowableException;
+
+public class RetryingHoodieMetastoreClient implements InvocationHandler, 
Serializable {
+
+  private static final Logger LOG =  
LogManager.getLogger(RetryingHoodieMetastoreClient.class);
+  private final HoodieMetastoreClient client;
+  private final int retryLimit;
+  private final int retryDelaySeconds;
+
+  private RetryingHoodieMetastoreClient(HoodieMetastoreConfig config) {
+    this.retryLimit = config.getConnectionRetryLimit();
+    this.retryDelaySeconds = config.getConnectionRetryDelay();
+    this.client = new HoodieMetastoreClientImp(config);
+  }
+
+  public static HoodieMetastoreClient getProxy(HoodieMetastoreConfig config) {
+    RetryingHoodieMetastoreClient handler = new 
RetryingHoodieMetastoreClient(config);
+    return (HoodieMetastoreClient) 
Proxy.newProxyInstance(RetryingHoodieMetastoreClient.class.getClassLoader(),
+        new Class[]{HoodieMetastoreClient.class}, handler);
+  }
+
+  @Override
+  public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+    int retry = 0;
+    Throwable err = null;
+    do {
+      try {
+        Object res = method.invoke(client, args);

Review Comment:
   it will be better to log time cost



##########
hudi-metastore/src/main/java/org/apache/hudi/metastore/service/HoodieMetastoreService.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metastore.service;
+
+import org.apache.hudi.metastore.thrift.HoodieInstantChangeResult;
+import org.apache.hudi.metastore.thrift.THoodieInstant;
+import org.apache.hudi.metastore.thrift.Table;
+import org.apache.hudi.metastore.thrift.ThriftHoodieMetastore;
+import org.apache.thrift.TException;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class HoodieMetastoreService implements ThriftHoodieMetastore.Iface, 
Serializable {

Review Comment:
   pls add doc for all new class



##########
hudi-metastore/src/main/java/org/apache/hudi/metastore/client/RetryingHoodieMetastoreClient.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metastore.client;
+
+import org.apache.hudi.common.config.HoodieMetastoreConfig;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.UndeclaredThrowableException;
+
+public class RetryingHoodieMetastoreClient implements InvocationHandler, 
Serializable {
+
+  private static final Logger LOG =  
LogManager.getLogger(RetryingHoodieMetastoreClient.class);
+  private final HoodieMetastoreClient client;
+  private final int retryLimit;
+  private final int retryDelaySeconds;
+
+  private RetryingHoodieMetastoreClient(HoodieMetastoreConfig config) {
+    this.retryLimit = config.getConnectionRetryLimit();
+    this.retryDelaySeconds = config.getConnectionRetryDelay();
+    this.client = new HoodieMetastoreClientImp(config);
+  }
+
+  public static HoodieMetastoreClient getProxy(HoodieMetastoreConfig config) {
+    RetryingHoodieMetastoreClient handler = new 
RetryingHoodieMetastoreClient(config);
+    return (HoodieMetastoreClient) 
Proxy.newProxyInstance(RetryingHoodieMetastoreClient.class.getClassLoader(),
+        new Class[]{HoodieMetastoreClient.class}, handler);
+  }
+
+  @Override
+  public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+    int retry = 0;
+    Throwable err = null;
+    do {
+      try {
+        Object res = method.invoke(client, args);

Review Comment:
   it will be better to log time cost



##########
hudi-metastore/src/main/java/org/apache/hudi/metastore/service/HoodieMetastoreService.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metastore.service;
+
+import org.apache.hudi.metastore.thrift.HoodieInstantChangeResult;
+import org.apache.hudi.metastore.thrift.THoodieInstant;
+import org.apache.hudi.metastore.thrift.Table;
+import org.apache.hudi.metastore.thrift.ThriftHoodieMetastore;
+import org.apache.thrift.TException;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class HoodieMetastoreService implements ThriftHoodieMetastore.Iface, 
Serializable {

Review Comment:
   pls add doc for all new class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to