RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  j...@rpm5.org
  Module: rpm                              Date:   05-Jul-2016 17:58:33
  Branch: rpm-5_4                          Handle: 2016070515583300

  Modified files:           (Branch: rpm-5_4)
    rpm/rpmio               rpmmqtt.c

  Log:
    - mqtt: sketch in "sub" vs "pub" handling. */

  Summary:
    Revision    Changes     Path
    1.1.2.15    +54 -29     rpm/rpmio/rpmmqtt.c
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmqtt.c
  ============================================================================
  $ cvs diff -u -r1.1.2.14 -r1.1.2.15 rpmmqtt.c
  --- rpm/rpmio/rpmmqtt.c       5 Jul 2016 14:49:11 -0000       1.1.2.14
  +++ rpm/rpmio/rpmmqtt.c       5 Jul 2016 15:58:33 -0000       1.1.2.15
  @@ -1595,20 +1595,19 @@
       return rc;
   }
   
  -/* XXX add subtopics arg? */
  -static rpmRC rpmmqttInitSubscribe(rpmmqtt mqtt)
  +static rpmRC rpmmqttInitSubscribe(rpmmqtt mqtt, const char ** topics)
   {
       rpmRC rc = RPMRC_FAIL;   /* assume failure */
       int xx;
   
  -    if (mqtt->subtopics) {
  -     int nsubs = argvCount((ARGV_t)mqtt->subtopics);
  +    if (topics) {
  +     int nsubs = argvCount((ARGV_t)topics);
   #ifdef       NOTYET  /* XXX MQTT segfault if done here. */
  -argvPrint("mqtt->subtopics", (ARGV_t)mqtt->subtopics, NULL);
  -     xx = rpmmqttSubscribeMany(mqtt, nsubs, (char **)mqtt->subtopics);
  +argvPrint(__FUNCTION__, (ARGV_t)topics, NULL);
  +     xx = rpmmqttSubscribeMany(mqtt, nsubs, topics);
   #else
        for (int i = 0; i < nsubs; i++) {
  -         xx = rpmmqttSub(mqtt, mqtt->subtopics[i], 0);
  +         xx = rpmmqttSub(mqtt, topics[i], 0);
        }
   #endif
       }
  @@ -1616,10 +1615,9 @@
       return rc;
   }
   
  -/* XXX add topics arg? */
  -static rpmRC rpmmqttInitPublish(rpmmqtt mqtt)
  +static rpmRC rpmmqttInitPublish(rpmmqtt mqtt, const char ** topics)
   {
  -    int ntopics = argvCount((ARGV_t)mqtt->topics);
  +    int ntopics = argvCount((ARGV_t)topics);
       int nmsgs = argvCount((ARGV_t)mqtt->msgs);
       int nifn = argvCount(mqtt->ifn);
       const char * ifn = NULL;
  @@ -1644,7 +1642,7 @@
                xx = rpmmqttPub(mqtt, topic, s, ns);
            else
            for (int j = 0; j < ntopics; j++) {
  -             topic = mqtt->topics[j];
  +             topic = topics[j];
                xx = rpmmqttPub(mqtt, topic, s, ns);
            }
        }
  @@ -1662,7 +1660,7 @@
                    xx = rpmmqttPub(mqtt, topic, s, ns);
                else
                for (int j = 0; j < ntopics; j++) {
  -                 topic = mqtt->topics[j];
  +                 topic = topics[j];
                    xx = rpmmqttPub(mqtt, topic, s, ns);
                }
            }
  @@ -1685,7 +1683,7 @@
                        xx = rpmmqttPub(mqtt, topic, s, ns);
                    else
                    for (int k = 0; j < ntopics; j++) {
  -                     topic = mqtt->topics[k];
  +                     topic = topics[k];
                        xx = rpmmqttPub(mqtt, topic, s, ns);
                    }
                }
  @@ -1891,25 +1889,52 @@
                        onDeliveryComplete));
        }
   
  -     /* Subscribe to initial topics (if any). */
  -#ifdef       DYING
  -argvPrint("mqtt->subtopics", (ARGV_t)mqtt->subtopics, NULL);
  -     if (mqtt->subtopics) {
  -         int nsubs = argvCount((ARGV_t)mqtt->subtopics);
  -#ifdef       NOTYET  /* XXX MQTT segfault if done here. */
  -         xx = rpmmqttSubscribeMany(mqtt, nsubs, (char **)mqtt->subtopics);
  -#else
  -         for (int i = 0; i < nsubs; i++) {
  -             xx = rpmmqttSub(mqtt, mqtt->subtopics[i], 0);
  +     /* XXX If any topic has a wild card, then switch to "sub" mode. */
  +     {
  +         const char * topic;
  +         int bingo = 0;
  +     
  +         topic = mqtt->topic;
  +         if (strstr(topic, "+") || strstr(topic, "#"))
  +             bingo = 1;
  +         else {
  +             int ntopics = argvCount((ARGV_t)mqtt->topics);
  +             for (int i = 0; i < ntopics; i++) {
  +                 topic = mqtt->topics[i];
  +                 if (!(strstr(topic, "+") || strstr(topic, "#")))
  +                     continue;
  +                 bingo = 1;
  +                 break;
  +             }
  +         }
  +         if (bingo) {
  +             mqtt->_progmode = _free(mqtt->_progmode);
  +             mqtt->_progmode = xstrdup("sub");
            }
  -#endif
        }
  -#else
  -     xx = rpmmqttInitSubscribe(mqtt);
  -#endif
   
  -     /* Publish any initial messages (if any). */
  -     xx = rpmmqttInitPublish(mqtt);
  +     /* Execute in either "sub" or "pub" mode. */
  +     if (!strcmp(mqtt->_progmode, "sub")) {
  +         /* Subscribe to topics (if any). */
  +         if (mqtt->topic)
  +             xx = rpmmqttSub(mqtt, mqtt->topic, strlen(mqtt->topic));
  +         xx = rpmmqttInitSubscribe(mqtt, mqtt->subtopics);
  +         xx = rpmmqttInitSubscribe(mqtt, mqtt->topics);
  +
  +         /* Wait for messages ... */
  +         while (mqtt->max_msg_count <= 0
  +             || mqtt->msg_count < mqtt->max_msg_count)
  +         {
  +             sleep(1);
  +         }
  +     } else {
  +         /* Subscribe to initial topics (if any). */
  +         xx = rpmmqttInitSubscribe(mqtt, mqtt->subtopics);
  +
  +         /* Publish any initial messages (if any). */
  +         xx = rpmmqttInitPublish(mqtt, mqtt->topics);
  +     }
  +     
   
       }
   #endif       /* WITH_MQTT */
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                rpm-cvs@rpm5.org

Reply via email to