thetumbled commented on PR #19781:
URL: https://github.com/apache/pulsar/pull/19781#issuecomment-1463402425

   BTW, this PR do not contain the code for reading content from topics and 
check for the data integrity, because i implement it out of pulsar project. the 
detailed code is as follows:
   ```
   package org.example.utils;
   
   public class PulsarCommonUtils {
       private static final Logger log = 
LoggerFactory.getLogger(PulsarCommonUtils.class);
   
       @Parameter(names = "-topic", description = "topic name")
       String topic = "persistent://test/test/testTxn10";
   
       @Parameter(names = "-sub", description = "subscription name")
       String sub = "sub";
   
       @Parameter(names = { "-u", "--service-url" }, description = "Pulsar 
Service URL")
       public String serviceURL;
   
       @Parameter(names = "-token", description ="token used to authenticate.")
       public String token = "";
   
       @Parameter(names = "--help", description = "help info", help = true)
       private boolean help = false;
   
   
       // check the content of topic whether constitute a continuous range
       public static boolean checkCorrectness(PulsarCommonUtils 
pulsarCommonUtils) throws PulsarClientException {
           boolean result = true;
           if (pulsarCommonUtils.serviceURL == null || 
pulsarCommonUtils.serviceURL.length() == 0) {
               log.error("please input Pulsar Service URL");
               return false;
           }
           PulsarClient client = PulsarClient.builder()
                   .serviceUrl(pulsarCommonUtils.serviceURL)
                   
.authentication(AuthenticationFactory.token(pulsarCommonUtils.token))
                   .build();
           Consumer<byte[]> consumer = client.newConsumer()
                   .topic(pulsarCommonUtils.topic)
                   .subscriptionName(pulsarCommonUtils.sub)
                   .subscribe();
           RangeSet<Long> rangeSet = TreeRangeSet.create();
           Message msg;
           Long id;
           long msgCount = 0l;
           long duplicateCount = 0l;
           while (true) {
               msg = consumer.receive(10, TimeUnit.SECONDS);
               if (msg == null) {
                   break;
               }
               msgCount++;
               id = CommonUtils.bytesToLong(msg.getData());
               if (rangeSet.contains(id)) {
                   duplicateCount++;
               } else {
                   // for n, we add range [n,n+1) into rangeSet.
                   rangeSet.add(Range.closedOpen(id, id + 1));
               }
               consumer.acknowledge(msg);
           }
           // check for message loss
           Set<Range<Long>> set = rangeSet.asRanges();
           if (set.size() > 1) {
               long validCount = 0;
               for (Range<Long> range : set) {
                   validCount += range.upperEndpoint() - range.lowerEndpoint();
               }
               log.info("Some messages are missing! more than one range 
exists." +
                       "Valid messages Count:{} RangeSet:{}", validCount, 
rangeSet);
               result = false;
           }else if (set.size() == 0) {
               log.info("there is no any message!");
               return true;
           }else{
               log.info("only one range exists. RangeSet:{}", rangeSet);
           }
   
           // check for message duplication
           if (duplicateCount != 0) {
               result = false;
           }
           log.info("duplicate message count:{}, total message count:{}, 
duplicate rate:{}%",
                   duplicateCount, msgCount, 100.0 * duplicateCount / msgCount);
   
   
           client.close();
           return result;
       }
   
       public static void main(String[] args) {
           PulsarCommonUtils pulsarCommonUtils = new PulsarCommonUtils();
           JCommander jcommander = 
JCommander.newBuilder().addObject(pulsarCommonUtils).build();
           try {
               jcommander.parse(args);
               if (pulsarCommonUtils.help) {
                   jcommander.usage();
                   System.exit(1);
               } else {
                   if (pulsarCommonUtils.checkCorrectness) {
                       checkCorrectness(pulsarCommonUtils);
                   } 
               }
           } catch (Exception e) {
               System.err.println(e);
               System.exit(1);
           }
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to