[ 
https://issues.apache.org/jira/browse/NUTCH-1480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16154573#comment-16154573
 ] 

ASF GitHub Bot commented on NUTCH-1480:
---------------------------------------

jorgelbg commented on a change in pull request #218: fix for NUTCH-1480 
contributed by r0ann3l
URL: https://github.com/apache/nutch/pull/218#discussion_r137141458
 
 

 ##########
 File path: 
src/plugin/indexer-rabbit/src/java/org/apache/nutch/indexwriter/rabbit/RabbitIndexWriter.java
 ##########
 @@ -31,164 +31,182 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.invoke.MethodHandles;
 import java.util.*;
 import java.util.concurrent.TimeoutException;
 
 public class RabbitIndexWriter implements IndexWriter {
 
-    private String serverHost;
-    private int serverPort;
-    private String serverVirtualHost;
-    private String serverUsername;
-    private String serverPassword;
+  private String serverHost;
+  private int serverPort;
+  private String serverVirtualHost;
+  private String serverUsername;
+  private String serverPassword;
 
-    private String exchangeServer;
-    private String exchangeType;
+  private String exchangeServer;
+  private String exchangeType;
 
-    private String queueName;
-    private boolean queueDurable;
-    private String queueRoutingKey;
+  private String queueName;
+  private boolean queueDurable;
+  private String queueRoutingKey;
 
-    private int commitSize;
+  private int commitSize;
 
-    public static final Logger LOG = 
LoggerFactory.getLogger(RabbitIndexWriter.class);
+  private static final Logger LOG = LoggerFactory
+          .getLogger(MethodHandles.lookup().lookupClass());
 
-    private Configuration config;
+  private Configuration config;
 
-    private RabbitMessage rabbitMessage = new RabbitMessage();
+  private RabbitMessage rabbitMessage = new RabbitMessage();
 
-    private Channel channel;
-    private Connection connection;
+  private Channel channel;
+  private Connection connection;
 
-    @Override
-    public Configuration getConf() {
-        return config;
-    }
+  @Override
+  public Configuration getConf() {
+    return config;
+  }
 
-    @Override
-    public void setConf(Configuration conf) {
-        config = conf;
+  @Override
+  public void setConf(Configuration conf) {
+    config = conf;
+  }
 
-        serverHost = conf.get(RabbitMQConstants.SERVER_HOST, "localhost");
-        serverPort = conf.getInt(RabbitMQConstants.SERVER_PORT, 15672);
-        serverVirtualHost = conf.get(RabbitMQConstants.SERVER_VIRTUAL_HOST, 
null);
+  @Override
+  public void open(JobConf JobConf, String name) throws IOException {
+    //Implementation not required
+  }
 
-        serverUsername = conf.get(RabbitMQConstants.SERVER_USERNAME, "admin");
-        serverPassword = conf.get(RabbitMQConstants.SERVER_PASSWORD, "admin");
+  /**
+   * Initializes the internal variables from a given index writer 
configuration.
+   *
+   * @param parameters Params from the index writer configuration.
+   * @throws IOException Some exception thrown by writer.
+   */
+  @Override
+  public void open(Map<String, String> parameters) throws IOException {
+    serverHost = parameters.getOrDefault(RabbitMQConstants.SERVER_HOST, 
"localhost");
+    serverPort = 
Integer.parseInt(parameters.getOrDefault(RabbitMQConstants.SERVER_PORT, 
"5672"));
+    serverVirtualHost = 
parameters.getOrDefault(RabbitMQConstants.SERVER_VIRTUAL_HOST, null);
 
-        exchangeServer = conf.get(RabbitMQConstants.EXCHANGE_SERVER, 
"nutch.exchange");
-        exchangeType = conf.get(RabbitMQConstants.EXCHANGE_TYPE, "direct");
+    serverUsername = 
parameters.getOrDefault(RabbitMQConstants.SERVER_USERNAME, "admin");
+    serverPassword = 
parameters.getOrDefault(RabbitMQConstants.SERVER_PASSWORD, "admin");
 
-        queueName = conf.get(RabbitMQConstants.QUEUE_NAME, "nutch.queue");
-        queueDurable = conf.getBoolean(RabbitMQConstants.QUEUE_DURABLE, true);
-        queueRoutingKey = conf.get(RabbitMQConstants.QUEUE_ROUTING_KEY, 
"nutch.key");
+    exchangeServer = 
parameters.getOrDefault(RabbitMQConstants.EXCHANGE_SERVER, "nutch.exchange");
+    exchangeType = parameters.getOrDefault(RabbitMQConstants.EXCHANGE_TYPE, 
"direct");
 
-        commitSize = conf.getInt(RabbitMQConstants.COMMIT_SIZE, 250);
-    }
+    queueName = parameters.getOrDefault(RabbitMQConstants.QUEUE_NAME, 
"nutch.queue");
+    queueDurable = 
Boolean.parseBoolean(parameters.getOrDefault(RabbitMQConstants.QUEUE_DURABLE, 
"true"));
+    queueRoutingKey = 
parameters.getOrDefault(RabbitMQConstants.QUEUE_ROUTING_KEY, "nutch.key");
 
-    @Override
-    public void open(JobConf JobConf, String name) throws IOException {
-        ConnectionFactory factory = new ConnectionFactory();
-        factory.setHost(serverHost);
-        factory.setPort(serverPort);
+    commitSize = 
Integer.parseInt(parameters.getOrDefault(RabbitMQConstants.COMMIT_SIZE, "250"));
 
-        if(serverVirtualHost != null) {
-            factory.setVirtualHost(serverVirtualHost);
-        }
+    ConnectionFactory factory = new ConnectionFactory();
+    factory.setHost(serverHost);
+    factory.setPort(serverPort);
 
-        factory.setUsername(serverUsername);
-        factory.setPassword(serverPassword);
+    if (serverVirtualHost != null) {
+      factory.setVirtualHost(serverVirtualHost);
+    }
 
-        try {
-            connection = factory.newConnection();
-            channel = connection.createChannel();
+    factory.setUsername(serverUsername);
+    factory.setPassword(serverPassword);
 
-            channel.exchangeDeclare(exchangeServer, exchangeType, true);
-            channel.queueDeclare(queueName, queueDurable, false, false, null);
-            channel.queueBind(queueName, exchangeServer, queueRoutingKey);
+    try {
+      connection = factory.newConnection(UUID.randomUUID().toString());
+      channel = connection.createChannel();
 
-        } catch (TimeoutException | IOException ex) {
-            throw makeIOException(ex);
-        }
-    }
+      channel.exchangeDeclare(exchangeServer, exchangeType, true);
+      channel.queueDeclare(queueName, queueDurable, false, false, null);
+      channel.queueBind(queueName, exchangeServer, queueRoutingKey);
 
-    @Override
-    public void update(NutchDocument doc) throws IOException {
-        RabbitDocument rabbitDocument = new RabbitDocument();
-
-        for (final Map.Entry<String, NutchField> e : doc) {
-            RabbitDocument.RabbitDocumentField field = new 
RabbitDocument.RabbitDocumentField(
-                    e.getKey(),
-                    e.getValue().getWeight(),
-                    e.getValue().getValues());
-            rabbitDocument.addField(field);
-        }
-        rabbitDocument.setDocumentBoost(doc.getWeight());
-
-        rabbitMessage.addDocToUpdate(rabbitDocument);
-        if(rabbitMessage.size() >= commitSize) {
-            commit();
-        }
+    } catch (TimeoutException | IOException ex) {
+      throw makeIOException(ex);
     }
-
-    @Override
-    public void commit() throws IOException {
-        if (!rabbitMessage.isEmpty()) {
-            channel.basicPublish(exchangeServer, queueRoutingKey, null, 
rabbitMessage.getBytes());
-        }
-        rabbitMessage.clear();
+  }
+
+  @Override
+  public void update(NutchDocument doc) throws IOException {
+    RabbitDocument rabbitDocument = new RabbitDocument();
+
+    for (final Map.Entry<String, NutchField> e : doc) {
+      RabbitDocument.RabbitDocumentField field = new 
RabbitDocument.RabbitDocumentField(
+              e.getKey(),
+              e.getValue().getWeight(),
+              e.getValue().getValues());
+      rabbitDocument.addField(field);
     }
+    rabbitDocument.setDocumentBoost(doc.getWeight());
 
-    @Override
-    public void write(NutchDocument doc) throws IOException {
-        RabbitDocument rabbitDocument = new RabbitDocument();
-
-        for (final Map.Entry<String, NutchField> e : doc) {
-            RabbitDocument.RabbitDocumentField field = new 
RabbitDocument.RabbitDocumentField(
-                    e.getKey(),
-                    e.getValue().getWeight(),
-                    e.getValue().getValues());
-            rabbitDocument.addField(field);
-        }
-        rabbitDocument.setDocumentBoost(doc.getWeight());
-
-        rabbitMessage.addDocToWrite(rabbitDocument);
-
-        if(rabbitMessage.size() >= commitSize) {
-            commit();
-        }
+    rabbitMessage.addDocToUpdate(rabbitDocument);
+    if (rabbitMessage.size() >= commitSize) {
+      commit();
     }
+  }
 
-    @Override
-    public void close() throws IOException {
-        commit();//TODO: This is because indexing job never call commit 
method. It should be fixed.
-        try {
-            channel.close();
-            connection.close();
-        } catch (IOException | TimeoutException e) {
-            throw makeIOException(e);
-        }
+  @Override
+  public void commit() throws IOException {
+    if (!rabbitMessage.isEmpty()) {
+      channel.basicPublish(exchangeServer, queueRoutingKey, null, 
rabbitMessage.getBytes());
+    }
+    rabbitMessage.clear();
+  }
+
+  @Override
+  public void write(NutchDocument doc) throws IOException {
+    RabbitDocument rabbitDocument = new RabbitDocument();
+
+    for (final Map.Entry<String, NutchField> e : doc) {
+      RabbitDocument.RabbitDocumentField field = new 
RabbitDocument.RabbitDocumentField(
+              e.getKey(),
+              e.getValue().getWeight(),
+              e.getValue().getValues());
+      rabbitDocument.addField(field);
     }
+    rabbitDocument.setDocumentBoost(doc.getWeight());
 
-    @Override
-    public void delete(String url) throws IOException {
-        rabbitMessage.addDocToDelete(url);
+    rabbitMessage.addDocToWrite(rabbitDocument);
 
-        if(rabbitMessage.size() >= commitSize) {
-            commit();
-        }
+    if (rabbitMessage.size() >= commitSize) {
+      commit();
     }
-
-    private static IOException makeIOException(Exception e) {
-        return new IOException(e);
+  }
+
+  @Override
+  public void close() throws IOException {
+    commit();//TODO: This is because indexing job never call commit method. It 
should be fixed.
+    try {
+      if(channel.isOpen()) {
+        channel.close();
+      }
+      if(connection.isOpen()) {
+        connection.close();
+      }
+    } catch (IOException | TimeoutException e) {
+      throw makeIOException(e);
     }
+  }
+
+  @Override
+  public void delete(String url) throws IOException {
+    rabbitMessage.addDocToDelete(url);
 
-    public String describe() {
-        return "RabbitIndexWriter\n" +
-                "\t" + serverHost +  ":" + serverPort + " : URL of RabbitMQ 
server\n" +
-                "\t" + RabbitMQConstants.SERVER_VIRTUAL_HOST + " : Virtualhost 
name\n" +
-                "\t" + RabbitMQConstants.SERVER_USERNAME + " : Username for 
authentication\n" +
-                "\t" + RabbitMQConstants.SERVER_PASSWORD + " : Password for 
authentication\n" +
-                "\t" + RabbitMQConstants.COMMIT_SIZE + " : Buffer size when 
sending to RabbitMQ (default 250)\n";
+    if (rabbitMessage.size() >= commitSize) {
+      commit();
     }
+  }
+
+  private static IOException makeIOException(Exception e) {
+    return new IOException(e);
+  }
+
+  public String describe() {
+    return "RabbitIndexWriter\n" +
+            "\t" + RabbitMQConstants.SERVER_HOST + " : Host of RabbitMQ 
server\n" +
+            "\t" + RabbitMQConstants.SERVER_PORT + " : Port of RabbitMQ 
server\n" +
+            "\t" + RabbitMQConstants.SERVER_VIRTUAL_HOST + " : Virtualhost 
name\n" +
+            "\t" + RabbitMQConstants.SERVER_USERNAME + " : Username for 
authentication\n" +
+            "\t" + RabbitMQConstants.SERVER_PASSWORD + " : Password for 
authentication\n" +
+            "\t" + RabbitMQConstants.COMMIT_SIZE + " : Buffer size when 
sending to RabbitMQ (default 250)\n";
+  }
 
 Review comment:
   For consistency probably use the `StringBuilder` pattern here? 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> SolrIndexer to write to multiple servers.
> -----------------------------------------
>
>                 Key: NUTCH-1480
>                 URL: https://issues.apache.org/jira/browse/NUTCH-1480
>             Project: Nutch
>          Issue Type: Improvement
>          Components: indexer
>            Reporter: Markus Jelsma
>            Assignee: Markus Jelsma
>            Priority: Minor
>         Attachments: adding-support-for-sharding-indexer-for-solr.patch, 
> NUTCH-1480-1.6.1.patch
>
>
> SolrUtils should return an array of SolrServers and read the SolrUrl as a 
> comma delimited list of URL's using Configuration.getString(). SolrWriter 
> should be able to handle this list of SolrServers.
> This is useful if you want to send documents to multiple servers if no 
> replication is available or if you want to send documents to multiple NOCs.
> edit:
> This does not replace NUTCH-1377 but complements it. With NUTCH-1377 this 
> issue allows you to index to multiple SolrCloud clusters at the same time.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to