This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 1b6dfa791554d08d6ae618dffe533925d4eed7c3 Merge: 81c6ae1657 f76f12baa6 Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Thu Aug 31 06:59:31 2023 -0400 Merge branch '2.1' This completes the removal of the single-mutation Writer class This fixes #3735 .../apache/accumulo/core/clientImpl/Writer.java | 122 --------------------- .../accumulo/server/problems/ProblemReport.java | 10 +- .../accumulo/server/problems/ProblemReports.java | 5 +- .../accumulo/server/util/MetadataTableUtil.java | 51 +++------ .../org/apache/accumulo/test/BatchWriterIT.java | 95 ++++++++++++++++ .../accumulo/test/MetaConstraintRetryIT.java | 13 ++- 6 files changed, 125 insertions(+), 171 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/Writer.java index 4a358aee6f,9f124ede54..0000000000 deleted file mode 100644,100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/Writer.java +++ /dev/null @@@ -1,122 -1,55 +1,0 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * https://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, -- * software distributed under the License is distributed on an -- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -- * KIND, either express or implied. See the License for the -- * specific language governing permissions and limitations -- * under the License. -- */ --package org.apache.accumulo.core.clientImpl; -- --import static com.google.common.base.Preconditions.checkArgument; - import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; - import static java.nio.charset.StandardCharsets.UTF_8; - import static java.util.concurrent.TimeUnit.MILLISECONDS; -- --import org.apache.accumulo.core.client.AccumuloException; - import org.apache.accumulo.core.client.AccumuloSecurityException; --import org.apache.accumulo.core.client.TableNotFoundException; - import org.apache.accumulo.core.clientImpl.TabletLocator.TabletLocation; - import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; --import org.apache.accumulo.core.data.Mutation; --import org.apache.accumulo.core.data.TableId; - import org.apache.accumulo.core.dataImpl.KeyExtent; - import org.apache.accumulo.core.rpc.ThriftUtil; - import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; - import org.apache.accumulo.core.tabletingest.thrift.ConstraintViolationException; - import org.apache.accumulo.core.tabletingest.thrift.TDurability; - import org.apache.accumulo.core.tabletingest.thrift.TabletIngestClientService; - import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; - import org.apache.accumulo.core.trace.TraceUtil; - import org.apache.hadoop.io.Text; - import org.apache.thrift.TException; - import org.apache.thrift.TServiceClient; - import org.slf4j.Logger; - import org.slf4j.LoggerFactory; - - import com.google.common.net.HostAndPort; -import org.apache.accumulo.core.metadata.schema.Ample; -- --public class Writer { - - private static final Logger log = LoggerFactory.getLogger(Writer.class); -- -- private ClientContext context; -- private TableId tableId; -- -- public Writer(ClientContext context, TableId tableId) { -- checkArgument(context != null, "context is null"); -- checkArgument(tableId != null, "tableId is null"); -- this.context = context; -- this.tableId = tableId; - } - - private static void updateServer(ClientContext context, Mutation m, KeyExtent extent, - HostAndPort server) throws TException, NotServingTabletException, - ConstraintViolationException, AccumuloSecurityException { - checkArgument(m != null, "m is null"); - checkArgument(extent != null, "extent is null"); - checkArgument(server != null, "server is null"); - checkArgument(context != null, "context is null"); - - TabletIngestClientService.Iface client = null; - try { - client = ThriftUtil.getClient(ThriftClientTypes.TABLET_INGEST, server, context); - client.update(TraceUtil.traceInfo(), context.rpcCreds(), extent.toThrift(), m.toThrift(), - TDurability.DEFAULT); - } catch (ThriftSecurityException e) { - throw new AccumuloSecurityException(e.user, e.code); - } finally { - ThriftUtil.returnClient((TServiceClient) client, context); - } -- } -- - public void update(Mutation m) throws AccumuloException, AccumuloSecurityException, - ConstraintViolationException, TableNotFoundException { - public void update(Mutation m) throws AccumuloException, TableNotFoundException { -- checkArgument(m != null, "m is null"); -- -- if (m.size() == 0) { -- throw new IllegalArgumentException("Can not add empty mutations"); -- } - - while (true) { - TabletLocation tabLoc = TabletLocator.getLocator(context, tableId).locateTablet(context, - new Text(m.getRow()), false, true); - - if (tabLoc == null) { - log.trace("No tablet location found for row {}", new String(m.getRow(), UTF_8)); - sleepUninterruptibly(500, MILLISECONDS); - continue; - } -- - final HostAndPort parsedLocation = HostAndPort.fromString(tabLoc.getTserverLocation()); - try { - updateServer(context, m, tabLoc.getExtent(), parsedLocation); - return; - } catch (NotServingTabletException e) { - log.trace("Not serving tablet, server = {}", parsedLocation); - TabletLocator.getLocator(context, tableId).invalidateCache(tabLoc.getExtent()); - } catch (ConstraintViolationException cve) { - log.error("error sending update to {}", parsedLocation, cve); - // probably do not need to invalidate cache, but it does not hurt - TabletLocator.getLocator(context, tableId).invalidateCache(tabLoc.getExtent()); - throw cve; - } catch (TException e) { - log.error("error sending update to {}", parsedLocation, e); - TabletLocator.getLocator(context, tableId).invalidateCache(tabLoc.getExtent()); - } - String table = Ample.DataLevel.of(tableId).metaTable(); -- - sleepUninterruptibly(500, MILLISECONDS); - try (var writer = context.createBatchWriter(table)) { - writer.addMutation(m); -- } -- -- } --} diff --cc server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java index f884287ab3,cdbac5fc9f..3f09dc769e --- a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java +++ b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java @@@ -51,9 -48,7 +51,8 @@@ import org.apache.accumulo.core.metadat import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.ServerContext; - import org.apache.accumulo.server.util.MetadataTableUtil; import org.apache.commons.collections4.map.LRUMap; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --cc test/src/main/java/org/apache/accumulo/test/BatchWriterIT.java index 242de8afbf,57c3ec90a9..cd3a727ded --- a/test/src/main/java/org/apache/accumulo/test/BatchWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/BatchWriterIT.java @@@ -22,13 -28,32 +28,33 @@@ import java.util.stream.Collectors import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; + import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; + import org.apache.accumulo.core.client.admin.NewTableConfiguration; + import org.apache.accumulo.core.clientImpl.ClientContext; + import org.apache.accumulo.core.clientImpl.TabletLocator; + import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; + import org.apache.accumulo.core.conf.Property; + import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; + import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; + import org.apache.accumulo.core.dataImpl.KeyExtent; + import org.apache.accumulo.core.rpc.ThriftUtil; + import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; -import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException; -import org.apache.accumulo.core.tabletserver.thrift.TDurability; -import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; ++import org.apache.accumulo.core.tabletingest.thrift.ConstraintViolationException; ++import org.apache.accumulo.core.tabletingest.thrift.TDurability; ++import org.apache.accumulo.core.tabletingest.thrift.TabletIngestClientService; + import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.harness.AccumuloClusterHarness; + import org.apache.accumulo.test.constraints.NumericValueConstraint; + import org.apache.hadoop.io.Text; + import org.apache.thrift.TServiceClient; import org.junit.jupiter.api.Test; ++import com.google.common.net.HostAndPort; ++ public class BatchWriterIT extends AccumuloClusterHarness { @Override @@@ -52,4 -77,73 +78,73 @@@ } } + private static void update(ClientContext context, Mutation m, KeyExtent extent) throws Exception { + + TabletLocator.TabletLocation tabLoc = TabletLocator.getLocator(context, extent.tableId()) + .locateTablet(context, new Text(m.getRow()), false, true); + - var server = HostAndPort.fromString(tabLoc.tablet_location); ++ var server = HostAndPort.fromString(tabLoc.getTserverLocation()); + - TabletClientService.Iface client = null; ++ TabletIngestClientService.Iface client = null; + try { - client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, server, context); ++ client = ThriftUtil.getClient(ThriftClientTypes.TABLET_INGEST, server, context); + client.update(TraceUtil.traceInfo(), context.rpcCreds(), extent.toThrift(), m.toThrift(), + TDurability.DEFAULT); + } catch (ThriftSecurityException e) { + throw new AccumuloSecurityException(e.user, e.code); + } finally { + ThriftUtil.returnClient((TServiceClient) client, context); + } + } + + static String toString(Map.Entry<Key,Value> e) { + return e.getKey().getRow() + ":" + e.getKey().getColumnFamily() + ":" + + e.getKey().getColumnQualifier() + ":" + e.getKey().getColumnVisibility() + ":" + + e.getValue(); + } + + @Test + public void testSingleMutationWriteRPC() throws Exception { + // The batchwriter used to use this RPC and no longer does. This test exist to exercise the + // unused RPC until its removed in 3.x. Older client versions of Accumulo 2.1.x may call this + // RPC. + + String table = getUniqueNames(1)[0]; + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + NewTableConfiguration ntc = new NewTableConfiguration(); + ntc.setProperties(Map.of(Property.TABLE_CONSTRAINT_PREFIX.getKey() + "1", + NumericValueConstraint.class.getName())); + c.tableOperations().create(table, ntc); + + var tableId = TableId.of(c.tableOperations().tableIdMap().get(table)); + + Mutation m = new Mutation("r1"); + m.put("f1", "q3", new Value("1")); + m.put("f1", "q4", new Value("2")); + + update((ClientContext) c, m, new KeyExtent(tableId, null, null)); + + try (var scanner = c.createScanner(table)) { + var entries = scanner.stream().map(BatchWriterIT::toString).collect(Collectors.toList()); + assertEquals(List.of("r1:f1:q3::1", "r1:f1:q4::2"), entries); + } + + m = new Mutation("r1"); + m.put("f1", "q3", new Value("5")); + m.put("f1", "q7", new Value("3")); + + update((ClientContext) c, m, new KeyExtent(tableId, null, null)); + + try (var scanner = c.createScanner(table)) { + var entries = scanner.stream().map(BatchWriterIT::toString).collect(Collectors.toList()); + assertEquals(List.of("r1:f1:q3::5", "r1:f1:q4::2", "r1:f1:q7::3"), entries); + } + + var m2 = new Mutation("r2"); + m2.put("f1", "q1", new Value("abc")); + assertThrows(ConstraintViolationException.class, + () -> update((ClientContext) c, m2, new KeyExtent(tableId, null, null))); + } + + } }