This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 1b6dfa791554d08d6ae618dffe533925d4eed7c3
Merge: 81c6ae1657 f76f12baa6
Author: Christopher Tubbs <ctubb...@apache.org>
AuthorDate: Thu Aug 31 06:59:31 2023 -0400

    Merge branch '2.1'
    
    This completes the removal of the single-mutation Writer class
    
    This fixes #3735

 .../apache/accumulo/core/clientImpl/Writer.java    | 122 ---------------------
 .../accumulo/server/problems/ProblemReport.java    |  10 +-
 .../accumulo/server/problems/ProblemReports.java   |   5 +-
 .../accumulo/server/util/MetadataTableUtil.java    |  51 +++------
 .../org/apache/accumulo/test/BatchWriterIT.java    |  95 ++++++++++++++++
 .../accumulo/test/MetaConstraintRetryIT.java       |  13 ++-
 6 files changed, 125 insertions(+), 171 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/Writer.java
index 4a358aee6f,9f124ede54..0000000000
deleted file mode 100644,100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/Writer.java
+++ /dev/null
@@@ -1,122 -1,55 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *   https://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing,
-- * software distributed under the License is distributed on an
-- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- * KIND, either express or implied.  See the License for the
-- * specific language governing permissions and limitations
-- * under the License.
-- */
--package org.apache.accumulo.core.clientImpl;
--
--import static com.google.common.base.Preconditions.checkArgument;
- import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
- import static java.nio.charset.StandardCharsets.UTF_8;
- import static java.util.concurrent.TimeUnit.MILLISECONDS;
--
--import org.apache.accumulo.core.client.AccumuloException;
- import org.apache.accumulo.core.client.AccumuloSecurityException;
--import org.apache.accumulo.core.client.TableNotFoundException;
- import org.apache.accumulo.core.clientImpl.TabletLocator.TabletLocation;
- import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
--import org.apache.accumulo.core.data.Mutation;
--import org.apache.accumulo.core.data.TableId;
- import org.apache.accumulo.core.dataImpl.KeyExtent;
- import org.apache.accumulo.core.rpc.ThriftUtil;
- import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
- import 
org.apache.accumulo.core.tabletingest.thrift.ConstraintViolationException;
- import org.apache.accumulo.core.tabletingest.thrift.TDurability;
- import org.apache.accumulo.core.tabletingest.thrift.TabletIngestClientService;
- import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
- import org.apache.accumulo.core.trace.TraceUtil;
- import org.apache.hadoop.io.Text;
- import org.apache.thrift.TException;
- import org.apache.thrift.TServiceClient;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- 
- import com.google.common.net.HostAndPort;
 -import org.apache.accumulo.core.metadata.schema.Ample;
--
--public class Writer {
- 
-   private static final Logger log = LoggerFactory.getLogger(Writer.class);
--
--  private ClientContext context;
--  private TableId tableId;
--
--  public Writer(ClientContext context, TableId tableId) {
--    checkArgument(context != null, "context is null");
--    checkArgument(tableId != null, "tableId is null");
--    this.context = context;
--    this.tableId = tableId;
-   }
- 
-   private static void updateServer(ClientContext context, Mutation m, 
KeyExtent extent,
-       HostAndPort server) throws TException, NotServingTabletException,
-       ConstraintViolationException, AccumuloSecurityException {
-     checkArgument(m != null, "m is null");
-     checkArgument(extent != null, "extent is null");
-     checkArgument(server != null, "server is null");
-     checkArgument(context != null, "context is null");
- 
-     TabletIngestClientService.Iface client = null;
-     try {
-       client = ThriftUtil.getClient(ThriftClientTypes.TABLET_INGEST, server, 
context);
-       client.update(TraceUtil.traceInfo(), context.rpcCreds(), 
extent.toThrift(), m.toThrift(),
-           TDurability.DEFAULT);
-     } catch (ThriftSecurityException e) {
-       throw new AccumuloSecurityException(e.user, e.code);
-     } finally {
-       ThriftUtil.returnClient((TServiceClient) client, context);
-     }
--  }
--
-   public void update(Mutation m) throws AccumuloException, 
AccumuloSecurityException,
-       ConstraintViolationException, TableNotFoundException {
 -  public void update(Mutation m) throws AccumuloException, 
TableNotFoundException {
--    checkArgument(m != null, "m is null");
--
--    if (m.size() == 0) {
--      throw new IllegalArgumentException("Can not add empty mutations");
--    }
- 
-     while (true) {
-       TabletLocation tabLoc = TabletLocator.getLocator(context, 
tableId).locateTablet(context,
-           new Text(m.getRow()), false, true);
- 
-       if (tabLoc == null) {
-         log.trace("No tablet location found for row {}", new 
String(m.getRow(), UTF_8));
-         sleepUninterruptibly(500, MILLISECONDS);
-         continue;
-       }
--
-       final HostAndPort parsedLocation = 
HostAndPort.fromString(tabLoc.getTserverLocation());
-       try {
-         updateServer(context, m, tabLoc.getExtent(), parsedLocation);
-         return;
-       } catch (NotServingTabletException e) {
-         log.trace("Not serving tablet, server = {}", parsedLocation);
-         TabletLocator.getLocator(context, 
tableId).invalidateCache(tabLoc.getExtent());
-       } catch (ConstraintViolationException cve) {
-         log.error("error sending update to {}", parsedLocation, cve);
-         // probably do not need to invalidate cache, but it does not hurt
-         TabletLocator.getLocator(context, 
tableId).invalidateCache(tabLoc.getExtent());
-         throw cve;
-       } catch (TException e) {
-         log.error("error sending update to {}", parsedLocation, e);
-         TabletLocator.getLocator(context, 
tableId).invalidateCache(tabLoc.getExtent());
-       }
 -    String table = Ample.DataLevel.of(tableId).metaTable();
--
-       sleepUninterruptibly(500, MILLISECONDS);
 -    try (var writer = context.createBatchWriter(table)) {
 -      writer.addMutation(m);
--    }
--
--  }
--}
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
index f884287ab3,cdbac5fc9f..3f09dc769e
--- 
a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
@@@ -51,9 -48,7 +51,8 @@@ import org.apache.accumulo.core.metadat
  import org.apache.accumulo.core.security.Authorizations;
  import org.apache.accumulo.core.util.threads.ThreadPools;
  import org.apache.accumulo.server.ServerContext;
- import org.apache.accumulo.server.util.MetadataTableUtil;
  import org.apache.commons.collections4.map.LRUMap;
 +import org.apache.zookeeper.KeeperException;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
diff --cc test/src/main/java/org/apache/accumulo/test/BatchWriterIT.java
index 242de8afbf,57c3ec90a9..cd3a727ded
--- a/test/src/main/java/org/apache/accumulo/test/BatchWriterIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/BatchWriterIT.java
@@@ -22,13 -28,32 +28,33 @@@ import java.util.stream.Collectors
  
  import org.apache.accumulo.core.client.Accumulo;
  import org.apache.accumulo.core.client.AccumuloClient;
+ import org.apache.accumulo.core.client.AccumuloSecurityException;
  import org.apache.accumulo.core.client.BatchWriter;
  import org.apache.accumulo.core.client.BatchWriterConfig;
+ import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+ import org.apache.accumulo.core.clientImpl.ClientContext;
+ import org.apache.accumulo.core.clientImpl.TabletLocator;
+ import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
+ import org.apache.accumulo.core.conf.Property;
+ import org.apache.accumulo.core.data.Key;
  import org.apache.accumulo.core.data.Mutation;
+ import org.apache.accumulo.core.data.TableId;
  import org.apache.accumulo.core.data.Value;
+ import org.apache.accumulo.core.dataImpl.KeyExtent;
+ import org.apache.accumulo.core.rpc.ThriftUtil;
+ import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 -import 
org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
 -import org.apache.accumulo.core.tabletserver.thrift.TDurability;
 -import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
++import 
org.apache.accumulo.core.tabletingest.thrift.ConstraintViolationException;
++import org.apache.accumulo.core.tabletingest.thrift.TDurability;
++import org.apache.accumulo.core.tabletingest.thrift.TabletIngestClientService;
+ import org.apache.accumulo.core.trace.TraceUtil;
 -import org.apache.accumulo.core.util.HostAndPort;
  import org.apache.accumulo.harness.AccumuloClusterHarness;
+ import org.apache.accumulo.test.constraints.NumericValueConstraint;
+ import org.apache.hadoop.io.Text;
+ import org.apache.thrift.TServiceClient;
  import org.junit.jupiter.api.Test;
  
++import com.google.common.net.HostAndPort;
++
  public class BatchWriterIT extends AccumuloClusterHarness {
  
    @Override
@@@ -52,4 -77,73 +78,73 @@@
      }
    }
  
+   private static void update(ClientContext context, Mutation m, KeyExtent 
extent) throws Exception {
+ 
+     TabletLocator.TabletLocation tabLoc = TabletLocator.getLocator(context, 
extent.tableId())
+         .locateTablet(context, new Text(m.getRow()), false, true);
+ 
 -    var server = HostAndPort.fromString(tabLoc.tablet_location);
++    var server = HostAndPort.fromString(tabLoc.getTserverLocation());
+ 
 -    TabletClientService.Iface client = null;
++    TabletIngestClientService.Iface client = null;
+     try {
 -      client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, server, 
context);
++      client = ThriftUtil.getClient(ThriftClientTypes.TABLET_INGEST, server, 
context);
+       client.update(TraceUtil.traceInfo(), context.rpcCreds(), 
extent.toThrift(), m.toThrift(),
+           TDurability.DEFAULT);
+     } catch (ThriftSecurityException e) {
+       throw new AccumuloSecurityException(e.user, e.code);
+     } finally {
+       ThriftUtil.returnClient((TServiceClient) client, context);
+     }
+   }
+ 
+   static String toString(Map.Entry<Key,Value> e) {
+     return e.getKey().getRow() + ":" + e.getKey().getColumnFamily() + ":"
+         + e.getKey().getColumnQualifier() + ":" + 
e.getKey().getColumnVisibility() + ":"
+         + e.getValue();
+   }
+ 
+   @Test
+   public void testSingleMutationWriteRPC() throws Exception {
+     // The batchwriter used to use this RPC and no longer does. This test 
exist to exercise the
+     // unused RPC until its removed in 3.x. Older client versions of Accumulo 
2.1.x may call this
+     // RPC.
+ 
+     String table = getUniqueNames(1)[0];
+     try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+       NewTableConfiguration ntc = new NewTableConfiguration();
+       ntc.setProperties(Map.of(Property.TABLE_CONSTRAINT_PREFIX.getKey() + 
"1",
+           NumericValueConstraint.class.getName()));
+       c.tableOperations().create(table, ntc);
+ 
+       var tableId = TableId.of(c.tableOperations().tableIdMap().get(table));
+ 
+       Mutation m = new Mutation("r1");
+       m.put("f1", "q3", new Value("1"));
+       m.put("f1", "q4", new Value("2"));
+ 
+       update((ClientContext) c, m, new KeyExtent(tableId, null, null));
+ 
+       try (var scanner = c.createScanner(table)) {
+         var entries = 
scanner.stream().map(BatchWriterIT::toString).collect(Collectors.toList());
+         assertEquals(List.of("r1:f1:q3::1", "r1:f1:q4::2"), entries);
+       }
+ 
+       m = new Mutation("r1");
+       m.put("f1", "q3", new Value("5"));
+       m.put("f1", "q7", new Value("3"));
+ 
+       update((ClientContext) c, m, new KeyExtent(tableId, null, null));
+ 
+       try (var scanner = c.createScanner(table)) {
+         var entries = 
scanner.stream().map(BatchWriterIT::toString).collect(Collectors.toList());
+         assertEquals(List.of("r1:f1:q3::5", "r1:f1:q4::2", "r1:f1:q7::3"), 
entries);
+       }
+ 
+       var m2 = new Mutation("r2");
+       m2.put("f1", "q1", new Value("abc"));
+       assertThrows(ConstraintViolationException.class,
+           () -> update((ClientContext) c, m2, new KeyExtent(tableId, null, 
null)));
+     }
+ 
+   }
  }

Reply via email to