sumitagrawl commented on code in PR #5358:
URL: https://github.com/apache/ozone/pull/5358#discussion_r1350320402
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +210,100 @@ private boolean displayTable(ManagedRocksIterator
iterator,
boolean schemaV3)
throws IOException {
- if (fileName == null) {
- // Print to stdout
- return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
- }
-
- // Write to file output
- try (PrintWriter out = new PrintWriter(fileName, UTF_8.name())) {
- return displayTable(iterator, dbColumnFamilyDef, out, schemaV3);
+ PrintWriter printWriter = null;
+ try {
+ if (fileName != null) {
+ printWriter = new PrintWriter(
+ new BufferedWriter(new PrintWriter(fileName, UTF_8.name())));
+ } else {
+ printWriter = out();
+ }
+ return displayTable(iterator, dbColumnFamilyDef, printWriter,
+ schemaV3);
+ } finally {
+ if (printWriter != null) {
+ printWriter.close();
+ }
}
}
private boolean displayTable(ManagedRocksIterator iterator,
DBColumnFamilyDefinition dbColumnFamilyDef,
- PrintWriter out,
- boolean schemaV3)
- throws IOException {
+ PrintWriter printWriter, boolean schemaV3) {
+ ThreadFactory factory = new ThreadFactoryBuilder()
+ .setNameFormat("DBScanner-%d")
+ .build();
+ ExecutorService threadPool = new ThreadPoolExecutor(
+ threadCount, threadCount, 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(1024), factory,
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ LogWriter logWriter = new LogWriter(printWriter);
+ try {
+ // Start JSON object (map) or array
+ printWriter.print(withKey ? "{ " : "[ ");
+ logWriter.start();
+ processRecords(iterator, dbColumnFamilyDef, logWriter,
+ threadPool, schemaV3);
+ } catch (InterruptedException e) {
+ exception = true;
+ Thread.currentThread().interrupt();
+ } finally {
+ threadPool.shutdownNow();
+ logWriter.stop();
+ logWriter.join();
+ // End JSON object (map) or array
+ printWriter.println(withKey ? " }" : " ]");
+ }
+ return !exception;
+ }
+ private void processRecords(ManagedRocksIterator iterator,
+ DBColumnFamilyDefinition dbColumnFamilyDef,
+ LogWriter logWriter, ExecutorService threadPool,
+ boolean schemaV3) throws InterruptedException {
if (startKey != null) {
iterator.get().seek(getValueObject(dbColumnFamilyDef));
}
-
- if (withKey) {
- // Start JSON object (map)
- out.print("{ ");
- } else {
- // Start JSON array
- out.print("[ ");
- }
-
+ ArrayList<ByteArrayKeyValue> batch = new ArrayList<>(batchSize);
+ // Used to ensure that the output of a multi-threaded parsed Json is in
+ // the same order as the RocksDB iterator.
+ long sequenceId = FIRST_SEQUENCE_ID;
// Count number of keys printed so far
long count = 0;
- while (withinLimit(count) && iterator.get().isValid()) {
- StringBuilder sb = new StringBuilder();
- if (withKey) {
- Object key = dbColumnFamilyDef.getKeyCodec()
- .fromPersistedFormat(iterator.get().key());
- Gson gson = new GsonBuilder().setPrettyPrinting().create();
- if (schemaV3) {
- int index =
- DatanodeSchemaThreeDBDefinition.getContainerKeyPrefixLength();
- String keyStr = key.toString();
- if (index > keyStr.length()) {
- err().println("Error: Invalid SchemaV3 table key length. "
- + "Is this a V2 table? Try again with --dn-schema=V2");
- return false;
- }
- String cid = keyStr.substring(0, index);
- String blockId = keyStr.substring(index);
- sb.append(gson.toJson(LongCodec.get().fromPersistedFormat(
- FixedLengthStringCodec.string2Bytes(cid)) +
- keySeparatorSchemaV3 +
- blockId));
- } else {
- sb.append(gson.toJson(key));
- }
- sb.append(": ");
- }
-
- Gson gson = new GsonBuilder().setPrettyPrinting().create();
- Object o = dbColumnFamilyDef.getValueCodec()
- .fromPersistedFormat(iterator.get().value());
- sb.append(gson.toJson(o));
-
+ List<Future<Void>> futures = new ArrayList<>();
+ while (withinLimit(count) && iterator.get().isValid() && !exception) {
+ batch.add(new ByteArrayKeyValue(
+ iterator.get().key(), iterator.get().value()));
iterator.get().next();
- ++count;
- if (withinLimit(count) && iterator.get().isValid()) {
- // If this is not the last entry, append comma
- sb.append(", ");
+ count++;
+ if (batch.size() >= batchSize) {
+ while (logWriter.getInflightLogCount() > threadCount * 10L
Review Comment:
ok, so here, we are controlling inflight queue as not present.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]