Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146628048
  
    --- Diff: 
integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/KafkaProducerBridge.java
 ---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.core.filter.Filter;
    +import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
    +import org.apache.activemq.artemis.core.persistence.StorageManager;
    +import org.apache.activemq.artemis.core.postoffice.Binding;
    +import org.apache.activemq.artemis.core.postoffice.PostOffice;
    +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
    +import org.apache.activemq.artemis.core.server.ConnectorService;
    +import org.apache.activemq.artemis.core.server.Consumer;
    +import org.apache.activemq.artemis.core.server.HandleStatus;
    +import org.apache.activemq.artemis.core.server.MessageReference;
    +import org.apache.activemq.artemis.core.server.Queue;
    +import 
org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer;
    +import org.apache.activemq.artemis.utils.ConfigurationHelper;
    +import org.apache.activemq.artemis.utils.ReusableLatch;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.kafka.common.utils.Utils;
    +
    +class KafkaProducerBridge implements Consumer, ConnectorService {
    +
    +   private final String connectorName;
    +
    +   private final String queueName;
    +
    +   private final String topicName;
    +
    +   private final PostOffice postOffice;
    +
    +   private Queue queue = null;
    +
    +   private Filter filter = null;
    +
    +   private String filterString;
    +
    +   private AtomicBoolean isStarted = new AtomicBoolean();
    +
    +   private boolean isConnected = false;
    +
    +   private Producer<String, Message> kafkaProducer;
    +
    +   private Map<String, Object> configuration;
    +
    +   private long sequentialID;
    +
    +   private final ReusableLatch pendingAcks = new ReusableLatch(0);
    +
    +   private final java.util.Map<Long, MessageReference> refs = new 
LinkedHashMap<>();
    +
    +   private final ScheduledExecutorService scheduledExecutorService;
    +
    +   private final int retryAttempts;
    +
    +   private final long retryInterval;
    +
    +   private final double retryMultiplier;
    +
    +   private final long retryMaxInterval;
    +
    +   private final AtomicInteger retryCount = new AtomicInteger();
    +
    +   private final AtomicBoolean awaitingReconnect = new AtomicBoolean();
    +
    +   private final KafkaProducerFactory<String, Message> 
kafkaProducerFactory;
    +
    +
    +   KafkaProducerBridge(String connectorName, KafkaProducerFactory<String, 
Message> kafkaProducerFactory, Map<String, Object> configuration, 
StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService 
scheduledExecutorService) {
    +      this.sequentialID = storageManager.generateID();
    +
    +      this.kafkaProducerFactory = kafkaProducerFactory;
    +
    +      this.connectorName = connectorName;
    +      this.queueName = 
ConfigurationHelper.getStringProperty(KafkaConstants.QUEUE_NAME, null, 
configuration);
    +      this.topicName = 
ConfigurationHelper.getStringProperty(KafkaConstants.TOPIC_NAME, null, 
configuration);
    +
    +      this.retryAttempts = 
ConfigurationHelper.getIntProperty(KafkaConstants.RETRY_ATTEMPTS_NAME, -1, 
configuration);
    +      this.retryInterval = 
ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_INTERVAL_NAME, 2000, 
configuration);
    +      this.retryMultiplier = 
ConfigurationHelper.getDoubleProperty(KafkaConstants.RETRY_MULTIPLIER_NAME, 2, 
configuration);
    +      this.retryMaxInterval = 
ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_MAX_INTERVAL_NAME, 
30000, configuration);
    +
    +      this.filterString = 
ConfigurationHelper.getStringProperty(KafkaConstants.FILTER_STRING, null, 
configuration);
    +
    +      configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
    +      if 
(!configuration.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    +         configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
CoreMessageSerializer.class.getName());
    +      }
    +
    +      this.postOffice = postOffice;
    +      this.configuration = configuration;
    +      this.scheduledExecutorService = scheduledExecutorService;
    +   }
    +
    +   private Map<String, Object> kafkaConfig(Map<String, Object> 
configuration) {
    +      Map<String, Object> filteredConfig = new HashMap<>(configuration);
    +      
KafkaConstants.OPTIONAL_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      
KafkaConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      return filteredConfig;
    +   }
    +
    +   @Override
    +   public void start() throws Exception {
    +      synchronized (this) {
    +         if (this.isStarted.get()) {
    +            return;
    +         }
    +         if (this.connectorName == null || 
this.connectorName.trim().equals("")) {
    +            throw new Exception("invalid connector name: " + 
this.connectorName);
    +         }
    +
    +         if (this.topicName == null || this.topicName.trim().equals("")) {
    +            throw new Exception("invalid topic name: " + topicName);
    +         }
    +
    +         if (this.queueName == null || this.queueName.trim().equals("")) {
    +            throw new Exception("invalid queue name: " + queueName);
    +         }
    +
    +         this.filter = FilterImpl.createFilter(filterString);
    +
    +         SimpleString name = new SimpleString(this.queueName);
    +         Binding b = this.postOffice.getBinding(name);
    +         if (b == null) {
    +            throw new Exception(connectorName + ": queue " + queueName + " 
not found");
    +         }
    +         this.queue = (Queue) b.getBindable();
    +
    +         this.kafkaProducer = 
kafkaProducerFactory.create(kafkaConfig(configuration));
    +
    +         List<PartitionInfo> topicPartitions = 
kafkaProducer.partitionsFor(topicName);
    +         if (topicPartitions == null || topicPartitions.size() == 0) {
    +            throw new Exception(connectorName + ": topic " + topicName + " 
not found");
    +         }
    +
    +         this.retryCount.set(0);
    +         this.isStarted.set(true);
    +         connect();
    +         ActiveMQKafkaLogger.LOGGER.bridgeStarted(connectorName);
    +      }
    +   }
    +
    +   public void connect() throws Exception {
    +      synchronized (this) {
    +         if (!isConnected && isStarted.get()) {
    +            isConnected = true;
    +            this.queue.addConsumer(this);
    +            this.queue.deliverAsync();
    +            ActiveMQKafkaLogger.LOGGER.bridgeConnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void disconnect() {
    +      synchronized (this) {
    +         if (isConnected) {
    +            if (queue != null) {
    +               this.queue.removeConsumer(this);
    +            }
    +
    +            cancelRefs();
    +            if (queue != null) {
    +               queue.deliverAsync();
    +            }
    +            isConnected = false;
    +            ActiveMQKafkaLogger.LOGGER.bridgeDisconnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void stop() {
    +      synchronized (this) {
    +         if (!this.isStarted.get()) {
    +            return;
    +         }
    +         
ActiveMQKafkaLogger.LOGGER.bridgeReceivedStopRequest(connectorName);
    +
    +         disconnect();
    +
    +         kafkaProducer.close();
    +         kafkaProducer = null;
    +         this.isStarted.set(false);
    +         ActiveMQKafkaLogger.LOGGER.bridgeStopped(connectorName);
    +      }
    +   }
    +
    +   @Override
    +   public boolean isStarted() {
    +      return this.isStarted.get();
    +   }
    +
    +   @Override
    +   public String getName() {
    +      return this.connectorName;
    +   }
    +
    +   @Override
    +   public boolean supportsDirectDelivery() {
    +      return false;
    +   }
    +
    +   @Override
    +   public HandleStatus handle(MessageReference ref) throws Exception {
    +      if (filter != null && !filter.match(ref.getMessage())) {
    +         return HandleStatus.NO_MATCH;
    +      }
    +
    +      synchronized (this) {
    +         ref.handled();
    +
    +         Message message = ref.getMessage();
    +
    +         synchronized (refs) {
    +            refs.put(ref.getMessage().getMessageID(), ref);
    +         }
    +
    +         Integer partition = null;
    +         SimpleString groupdID = message.getGroupID();
    +         if (groupdID != null) {
    +            List partitions = kafkaProducer.partitionsFor(topicName);
    +            int numPartitions = partitions.size();
    +            partition = 
Utils.toPositive(Utils.murmur2(groupdID.getData())) % numPartitions;
    --- End diff --
    
    I have an idea of handling this with a custom partitioner and custom 
serialiser as an alternative 


---

Reply via email to